diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala similarity index 83% rename from core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala rename to core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index b8f5d3a5b02aa..76e3932a9bb91 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.storage +package org.apache.spark.shuffle import java.io.File +import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ -import org.apache.spark.Logging +import org.apache.spark.{SparkEnv, SparkConf, Logging} +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ShuffleManager -import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} -import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.executor.ShuffleWriteMetrics /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -61,20 +61,18 @@ private[spark] trait ShuffleWriterGroup { * each block stored in each file. In order to find the location of a shuffle block, we search the * files within a ShuffleFileGroups associated with the block's reducer. */ -// TODO: Factor this into a separate class for each ShuffleManager implementation + private[spark] -class ShuffleBlockManager(blockManager: BlockManager, - shuffleManager: ShuffleManager) extends Logging { - def conf = blockManager.conf +class FileShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with Logging { + + private lazy val blockManager = SparkEnv.get.blockManager // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. - val consolidateShuffleFiles = + private val consolidateShuffleFiles = conf.getBoolean("spark.shuffle.consolidateFiles", false) - // Are we using sort-based shuffle? - val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager] - private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 /** @@ -93,22 +91,11 @@ class ShuffleBlockManager(blockManager: BlockManager, val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - type ShuffleId = Int private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) - /** - * Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle - * because it just writes a single file by itself. - */ - def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) - val shuffleState = shuffleStates(shuffleId) - shuffleState.completedMapTasks.add(mapId) - } - /** * Get a ShuffleWriterGroup for the given map task, which will register it as complete * when the writers are closed successfully @@ -181,17 +168,30 @@ class ShuffleBlockManager(blockManager: BlockManager, /** * Returns the physical file segment in which the given BlockId is located. - * This function should only be called if shuffle file consolidation is enabled, as it is - * an error condition if we don't find the expected block. */ - def getBlockLocation(id: ShuffleBlockId): FileSegment = { - // Search all file groups associated with this shuffle. - val shuffleState = shuffleStates(id.shuffleId) - for (fileGroup <- shuffleState.allFileGroups) { - val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) - if (segment.isDefined) { return segment.get } + private def getBlockLocation(id: ShuffleBlockId): FileSegment = { + if (consolidateShuffleFiles) { + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(id.shuffleId) + val iter = shuffleState.allFileGroups.iterator + while (iter.hasNext) { + val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId) + if (segment.isDefined) { return segment.get } + } + throw new IllegalStateException("Failed to find shuffle block: " + id) + } else { + val file = blockManager.diskBlockManager.getFile(id) + new FileSegment(file, 0, file.length()) } - throw new IllegalStateException("Failed to find shuffle block: " + id) + } + + override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = { + val segment = getBlockLocation(blockId) + blockManager.diskStore.getBytes(segment) + } + + override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = { + Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])) } /** Remove all the blocks / files and metadata related to a particular shuffle. */ @@ -207,14 +207,7 @@ class ShuffleBlockManager(blockManager: BlockManager, private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { shuffleStates.get(shuffleId) match { case Some(state) => - if (sortBasedShuffle) { - // There's a single block ID for each map, plus an index file for it - for (mapId <- state.completedMapTasks) { - val blockId = new ShuffleBlockId(shuffleId, mapId, 0) - blockManager.diskBlockManager.getFile(blockId).delete() - blockManager.diskBlockManager.getFile(blockId.name + ".index").delete() - } - } else if (consolidateShuffleFiles) { + if (consolidateShuffleFiles) { for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { file.delete() } @@ -240,13 +233,13 @@ class ShuffleBlockManager(blockManager: BlockManager, shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) } - def stop() { + override def stop() { metadataCleaner.cancel() } } private[spark] -object ShuffleBlockManager { +object FileShuffleBlockManager { /** * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala new file mode 100644 index 0000000000000..8bb9efc46cc58 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.SparkEnv +import org.apache.spark.storage._ + +/** + * Create and maintain the shuffle blocks' mapping between logic block and physical file location. + * Data of shuffle blocks from the same map task are stored in a single consolidated data file. + * The offsets of the data blocks in the data file are stored in a separate index file. + * + * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" + * as the filename postfix for data file, and ".index" as the filename postfix for index file. + * + */ +private[spark] +class IndexShuffleBlockManager extends ShuffleBlockManager { + + private lazy val blockManager = SparkEnv.get.blockManager + + /** + * Mapping to a single shuffleBlockId with reduce ID 0. + * */ + def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = { + ShuffleBlockId(shuffleId, mapId, 0) + } + + def getDataFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) + } + + private def getIndexFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) + } + + /** + * Remove data file and index file that contain the output data from one map. + * */ + def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { + var file = getDataFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + + file = getIndexFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + } + + /** + * Write an index file with the offsets of each block, plus a final offset at the end for the + * end of the output file. This will be used by getBlockLocation to figure out where each block + * begins and ends. + * */ + def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = { + val indexFile = getIndexFile(shuffleId, mapId) + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) + try { + // We take in lengths of each block, need to convert it to offsets. + var offset = 0L + out.writeLong(offset) + + for (length <- lengths) { + offset += length + out.writeLong(offset) + } + } finally { + out.close() + } + } + + /** + * Get the location of a block in a map output file. Uses the index file we create for it. + * */ + private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = { + // The block is actually going to be a range of a single map output file for this map, so + // find out the consolidated file, then the offset within that from our index + val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) + + val in = new DataInputStream(new FileInputStream(indexFile)) + try { + in.skip(blockId.reduceId * 8) + val offset = in.readLong() + val nextOffset = in.readLong() + new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) + } finally { + in.close() + } + } + + override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = { + val segment = getBlockLocation(blockId) + blockManager.diskStore.getBytes(segment) + } + + override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = { + Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])) + } + + override def stop() = {} +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala new file mode 100644 index 0000000000000..4240580250046 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.nio.ByteBuffer + +import org.apache.spark.storage.{FileSegment, ShuffleBlockId} + +private[spark] +trait ShuffleBlockManager { + type ShuffleId = Int + + /** + * Get shuffle block data managed by the local ShuffleBlockManager. + * @return Some(ByteBuffer) if block found, otherwise None. + */ + def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] + + def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] + + def stop(): Unit +} + diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 9c859b8b4a118..801ae54086053 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -49,8 +49,13 @@ private[spark] trait ShuffleManager { endPartition: Int, context: TaskContext): ShuffleReader[K, C] - /** Remove a shuffle's metadata from the ShuffleManager. */ - def unregisterShuffle(shuffleId: Int) + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + def unregisterShuffle(shuffleId: Int): Boolean + + def shuffleBlockManager: ShuffleBlockManager /** Shut down this ShuffleManager. */ def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index df98d18fa8193..62e0629b34400 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -25,6 +25,9 @@ import org.apache.spark.shuffle._ * mapper (possibly reusing these across waves of tasks). */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, @@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { - new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + new HashShuffleWriter( + shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Unit = {} + override def unregisterShuffle(shuffleId: Int): Boolean = { + shuffleBlockManager.removeShuffle(shuffleId) + } + + override def shuffleBlockManager: FileShuffleBlockManager = { + fileShuffleBlockManager + } /** Shut down this ShuffleManager. */ - override def stop(): Unit = {} + override def stop(): Unit = { + shuffleBlockManager.stop() + } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 51e454d9313c9..4b9454d75abb7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -17,14 +17,15 @@ package org.apache.spark.shuffle.hash -import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter} -import org.apache.spark.{Logging, MapOutputTracker, SparkEnv, TaskContext} -import org.apache.spark.storage.{BlockObjectWriter} -import org.apache.spark.serializer.Serializer +import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle._ +import org.apache.spark.storage.BlockObjectWriter private[spark] class HashShuffleWriter[K, V]( + shuffleBlockManager: FileShuffleBlockManager, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) @@ -43,7 +44,6 @@ private[spark] class HashShuffleWriter[K, V]( metrics.shuffleWriteMetrics = Some(writeMetrics) private val blockManager = SparkEnv.get.blockManager - private val shuffleBlockManager = blockManager.shuffleBlockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 6dcca47ea7c0c..b727438ae7e47 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -17,14 +17,17 @@ package org.apache.spark.shuffle.sort -import java.io.{DataInputStream, FileInputStream} +import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency} import org.apache.spark.shuffle._ -import org.apache.spark.{TaskContext, ShuffleDependency} import org.apache.spark.shuffle.hash.HashShuffleReader -import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} -private[spark] class SortShuffleManager extends ShuffleManager { +private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val indexShuffleBlockManager = new IndexShuffleBlockManager() + private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() + /** * Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ @@ -52,29 +55,29 @@ private[spark] class SortShuffleManager extends ShuffleManager { /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { - new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] + shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) + new SortShuffleWriter( + shuffleBlockManager, baseShuffleHandle, mapId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Unit = {} + override def unregisterShuffle(shuffleId: Int): Boolean = { + if (shuffleMapNumber.containsKey(shuffleId)) { + val numMaps = shuffleMapNumber.remove(shuffleId) + (0 until numMaps).map{ mapId => + shuffleBlockManager.removeDataByMap(shuffleId, mapId) + } + } + true + } - /** Shut down this ShuffleManager. */ - override def stop(): Unit = {} + override def shuffleBlockManager: IndexShuffleBlockManager = { + indexShuffleBlockManager + } - /** Get the location of a block in a map output file. Uses the index file we create for it. */ - def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = { - // The block is actually going to be a range of a single map output file for this map, so - // figure out the ID of the consolidated file, then the offset within that from our index - val consolidatedId = blockId.copy(reduceId = 0) - val indexFile = diskManager.getFile(consolidatedId.name + ".index") - val in = new DataInputStream(new FileInputStream(indexFile)) - try { - in.skip(blockId.reduceId * 8) - val offset = in.readLong() - val nextOffset = in.readLong() - new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset - offset) - } finally { - in.close() - } + /** Shut down this ShuffleManager. */ + override def stop(): Unit = { + shuffleBlockManager.stop() } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index b8c9ad46ab035..89a78d6982ba0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -17,29 +17,25 @@ package org.apache.spark.shuffle.sort -import java.io.File - import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter private[spark] class SortShuffleWriter[K, V, C]( + shuffleBlockManager: IndexShuffleBlockManager, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging { private val dep = handle.dependency - private val numPartitions = dep.partitioner.numPartitions private val blockManager = SparkEnv.get.blockManager private var sorter: ExternalSorter[K, V, _] = null - private var outputFile: File = null - private var indexFile: File = null // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure @@ -69,17 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } - // Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later - // serve different ranges of this file using an index file that we create at the end. - val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0) - - outputFile = blockManager.diskBlockManager.getFile(blockId) - indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index") - - val partitionLengths = sorter.writePartitionedFile(blockId, context) - - // Register our map output with the ShuffleBlockManager, which handles cleaning it over time - blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions) + val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) + val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) + val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) + shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = new MapStatus(blockManager.blockManagerId, partitionLengths.map(MapOutputTracker.compressSize)) @@ -95,13 +84,8 @@ private[spark] class SortShuffleWriter[K, V, C]( if (success) { return Option(mapStatus) } else { - // The map task failed, so delete our output file if we created one - if (outputFile != null) { - outputFile.delete() - } - if (indexFile != null) { - indexFile.delete() - } + // The map task failed, so delete our output data. + shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId) return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index d07e6a1b1836c..e35b7fe62c753 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -197,7 +197,7 @@ object BlockFetcherIterator { for (id <- localBlocksToFetch) { try { readMetrics.localBlocksFetched += 1 - results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get)) + results.put(new FetchResult(id, 0, () => getLocalShuffleFromDisk(id, serializer).get)) logDebug("Got local block " + id) } catch { case e: Exception => { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c1756ac905417..a83a3f468ae5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -58,6 +58,11 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } +@DeveloperApi +case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { + def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" +} + @DeveloperApi case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" @@ -92,6 +97,7 @@ private[spark] case class TestBlockId(id: String) extends BlockId { object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r + val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r @@ -104,6 +110,8 @@ object BlockId { RDDBlockId(rddId.toInt, splitIndex.toInt) case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_DATA(shuffleId, mapId, reduceId) => + ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case BROADCAST(broadcastId, field) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index cfe5b6c50aea2..a714142763243 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -64,8 +64,8 @@ private[spark] class BlockManager( extends BlockDataProvider with Logging { private val port = conf.getInt("spark.blockManager.port", 0) - val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) + + val diskBlockManager = new DiskBlockManager(this, conf) val connectionManager = new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") @@ -83,7 +83,7 @@ private[spark] class BlockManager( val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") val tachyonBlockManager = - new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster) + new TachyonBlockManager(this, tachyonStorePath, tachyonMaster) tachyonInitialized = true new TachyonStore(this, tachyonBlockManager) } @@ -215,7 +215,7 @@ private[spark] class BlockManager( override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = { val bid = BlockId(blockId) if (bid.isShuffle) { - Left(diskBlockManager.getBlockLocation(bid)) + shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]) } else { val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] if (blockBytesOpt.isDefined) { @@ -333,8 +333,14 @@ private[spark] class BlockManager( * shuffle blocks. It is safe to do so without a lock on block info since disk store * never deletes (recent) items. */ - def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - diskStore.getValues(blockId, serializer).orElse { + def getLocalShuffleFromDisk( + blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + + val shuffleBlockManager = shuffleManager.shuffleBlockManager + val values = shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map( + bytes => this.dataDeserialize(blockId, bytes, serializer)) + + values.orElse { throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be") } } @@ -355,7 +361,8 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - diskStore.getBytes(blockId) match { + val shuffleBlockManager = shuffleManager.shuffleBlockManager + shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match { case Some(bytes) => Some(bytes) case None => @@ -1045,7 +1052,6 @@ private[spark] class BlockManager( def stop(): Unit = { connectionManager.stop() - shuffleBlockManager.stop() diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index c194e0fed3367..14ae2f38c5670 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import akka.actor.{ActorRef, Actor} -import org.apache.spark.{Logging, MapOutputTracker} +import org.apache.spark.{Logging, MapOutputTracker, SparkEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.ActorLogReceive @@ -55,7 +55,7 @@ class BlockManagerSlaveActor( if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } - blockManager.shuffleBlockManager.removeShuffle(shuffleId) + SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } case RemoveBroadcast(broadcastId, tellMaster) => diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ec022ce9c048a..a715594f198c2 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -21,11 +21,9 @@ import java.io.File import java.text.SimpleDateFormat import java.util.{Date, Random, UUID} -import org.apache.spark.{SparkConf, SparkEnv, Logging} +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.network.netty.PathResolver import org.apache.spark.util.Utils -import org.apache.spark.shuffle.sort.SortShuffleManager /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -36,13 +34,11 @@ import org.apache.spark.shuffle.sort.SortShuffleManager * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). */ -private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf) - extends PathResolver with Logging { +private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf) + extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - - private val subDirsPerLocalDir = - shuffleBlockManager.conf.getInt("spark.diskStore.subDirectories", 64) + private val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid @@ -56,26 +52,6 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, addShutdownHook() - /** - * Returns the physical file segment in which the given BlockId is located. If the BlockId has - * been mapped to a specific FileSegment by the shuffle layer, that will be returned. - * Otherwise, we assume the Block is mapped to the whole file identified by the BlockId. - */ - def getBlockLocation(blockId: BlockId): FileSegment = { - val env = SparkEnv.get // NOTE: can be null in unit tests - if (blockId.isShuffle && env != null && env.shuffleManager.isInstanceOf[SortShuffleManager]) { - // For sort-based shuffle, let it figure out its blocks - val sortShuffleManager = env.shuffleManager.asInstanceOf[SortShuffleManager] - sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) - } else if (blockId.isShuffle && shuffleBlockManager.consolidateShuffleFiles) { - // For hash-based shuffle with consolidated files, ShuffleBlockManager takes care of this - shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) - } else { - val file = getFile(blockId.name) - new FileSegment(file, 0, file.length()) - } - } - def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) @@ -105,7 +81,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getBlockLocation(blockId).file.exists() + getFile(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c83261dd91b36..e9304f6bb45d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -34,7 +34,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) override def getSize(blockId: BlockId): Long = { - diskManager.getBlockLocation(blockId).length + diskManager.getFile(blockId.name).length } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { @@ -89,25 +89,33 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } } - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val segment = diskManager.getBlockLocation(blockId) - val channel = new RandomAccessFile(segment.file, "r").getChannel + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { + val channel = new RandomAccessFile(file, "r").getChannel try { // For small files, directly read rather than memory map - if (segment.length < minMemoryMapBytes) { - val buf = ByteBuffer.allocate(segment.length.toInt) - channel.read(buf, segment.offset) + if (length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(length.toInt) + channel.read(buf, offset) buf.flip() Some(buf) } else { - Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) + Some(channel.map(MapMode.READ_ONLY, offset, length)) } } finally { channel.close() } } + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = diskManager.getFile(blockId.name) + getBytes(file, 0, file.length) + } + + def getBytes(segment: FileSegment): Option[ByteBuffer] = { + getBytes(segment.file, segment.offset, segment.length) + } + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } @@ -117,24 +125,25 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc * shuffle short-circuit code. */ def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + // TODO: Should bypass getBytes and use a stream based implementation, so that + // we won't use a lot of memory during e.g. external sort merge. getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) } override def remove(blockId: BlockId): Boolean = { - val fileSegment = diskManager.getBlockLocation(blockId) - val file = fileSegment.file - if (file.exists() && file.length() == fileSegment.length) { + val file = diskManager.getFile(blockId.name) + // If consolidation mode is used With HashShuffleMananger, the physical filename for the block + // is different from blockId.name. So the file returns here will not be exist, thus we avoid to + // delete the whole consolidated file by mistake. + if (file.exists()) { file.delete() } else { - if (fileSegment.length < file.length()) { - logWarning(s"Could not delete block associated with only a part of a file: $blockId") - } false } } override def contains(blockId: BlockId): Boolean = { - val file = diskManager.getBlockLocation(blockId).file + val file = diskManager.getFile(blockId.name) file.exists() } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index a6cbe3aa440ff..6908a59a79e60 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ private[spark] class TachyonBlockManager( - shuffleManager: ShuffleBlockManager, + blockManager: BlockManager, rootDirs: String, val master: String) extends Logging { @@ -49,7 +49,7 @@ private[spark] class TachyonBlockManager( private val MAX_DIR_CREATION_ATTEMPTS = 10 private val subDirsPerTachyonDir = - shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; // then, inside this directory, create multiple subdirectories that we will hash files into, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 02df4e8fe61af..b0e3bb3b552fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -21,7 +21,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.StorageLevel import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f7f918fd521a9..eaeb861f59e5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 5d8a648d9551e..782b979e2e93d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -719,20 +719,20 @@ private[spark] class ExternalSorter[K, V, C]( def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) /** - * Write all the data added into this ExternalSorter into a file in the disk store, creating - * an .index file for it as well with the offsets of each partition. This is called by the - * SortShuffleWriter and can go through an efficient path of just concatenating binary files - * if we decided to avoid merge-sorting. + * Write all the data added into this ExternalSorter into a file in the disk store. This is + * called by the SortShuffleWriter and can go through an efficient path of just concatenating + * binary files if we decided to avoid merge-sorting. * * @param blockId block ID to write to. The index file will be blockId.name + ".index". * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ - def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = { - val outputFile = blockManager.diskBlockManager.getFile(blockId) + def writePartitionedFile( + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { // Track location of each range in the output file - val offsets = new Array[Long](numPartitions + 1) val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { @@ -750,7 +750,6 @@ private[spark] class ExternalSorter[K, V, C]( in.close() in = null lengths(i) = size - offsets(i + 1) = offsets(i) + lengths(i) } } finally { if (out != null) { @@ -772,11 +771,7 @@ private[spark] class ExternalSorter[K, V, C]( } writer.commitAndClose() val segment = writer.fileSegment() - offsets(id + 1) = segment.offset + segment.length lengths(id) = segment.length - } else { - // The partition is empty; don't create a new writer to avoid writing headers, etc - offsets(id + 1) = offsets(id) } } } @@ -784,23 +779,6 @@ private[spark] class ExternalSorter[K, V, C]( context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled context.taskMetrics.diskBytesSpilled += diskBytesSpilled - // Write an index file with the offsets of each block, plus a final offset at the end for the - // end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure - // out where each block begins and ends. - - val diskBlockManager = blockManager.diskBlockManager - val indexFile = diskBlockManager.getFile(blockId.name + ".index") - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) - try { - var i = 0 - while (i < numPartitions + 1) { - out.writeLong(offsets(i)) - i += 1 - } - } finally { - out.close() - } - lengths } @@ -811,7 +789,7 @@ private[spark] class ExternalSorter[K, V, C]( if (writer.isOpen) { writer.commitAndClose() } - blockManager.getLocalFromDisk(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] + blockManager.diskStore.getValues(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] } def stop(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala new file mode 100644 index 0000000000000..6061e544e79b4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.hash + +import java.io.{File, FileWriter} + +import scala.language.reflectiveCalls + +import org.scalatest.FunSuite + +import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.storage.{ShuffleBlockId, FileSegment} + +class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { + private val testConf = new SparkConf(false) + + private def checkSegments(segment1: FileSegment, segment2: FileSegment) { + assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) + assert (segment1.offset === segment2.offset) + assert (segment1.length === segment2.length) + } + + test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { + + val conf = new SparkConf(false) + // reset after EACH object write. This is to ensure that there are bytes appended after + // an object is written. So if the codepaths assume writeObject is end of data, this should + // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") + + sc = new SparkContext("local", "test", conf) + + val shuffleBlockManager = + SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager] + + val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + for (writer <- shuffle1.writers) { + writer.write("test1") + writer.write("test2") + } + for (writer <- shuffle1.writers) { + writer.commitAndClose() + } + + val shuffle1Segment = shuffle1.writers(0).fileSegment() + shuffle1.releaseWriters(success = true) + + val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + + for (writer <- shuffle2.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle2.writers) { + writer.commitAndClose() + } + val shuffle2Segment = shuffle2.writers(0).fileSegment() + shuffle2.releaseWriters(success = true) + + // Now comes the test : + // Write to shuffle 3; and close it, but before registering it, check if the file lengths for + // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length + // of block based on remaining data in file : which could mess things up when there is concurrent read + // and writes happening to the same shuffle group. + + val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + new ShuffleWriteMetrics) + for (writer <- shuffle3.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle3.writers) { + writer.commitAndClose() + } + // check before we register. + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffle3.releaseWriters(success = true) + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffleBlockManager.removeShuffle(1) + + } + + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index fbfcb5156d496..3c86f6bafcaa3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -60,11 +60,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { } // 3rd block is going to fail - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -76,24 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined") - verify(blockManager, times(1)).getLocalFromDisk(any(), any()) + verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined") - verify(blockManager, times(2)).getLocalFromDisk(any(), any()) + verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") // 3rd fetch should be failed intercept[Exception] { iterator.next() } - verify(blockManager, times(3)).getLocalFromDisk(any(), any()) + verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any()) } @@ -115,11 +115,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { val optItr = mock(classOf[Option[Iterator[Any]]]) // All blocks should be fetched successfully - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -131,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") @@ -145,7 +145,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") - verify(blockManager, times(5)).getLocalFromDisk(any(), any()) + verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any()) } test("block fetch from remote fails using BasicBlockFetcherIterator") { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bdcea07e5714f..14ffadab99cae 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,6 +49,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps +import org.apache.spark.shuffle.ShuffleBlockManager class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { @@ -823,11 +824,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // be nice to refactor classes involved in disk storage in a way that // allows for easier testing. val blockManager = mock(classOf[BlockManager]) - val shuffleBlockManager = mock(classOf[ShuffleBlockManager]) - when(shuffleBlockManager.conf).thenReturn(conf) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) + val diskBlockManager = new DiskBlockManager(blockManager, conf) + val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) val mapped = diskStoreMapped.getBytes(blockId).get diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aabaeadd7a071..26082ded8ca7a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls import akka.actor.Props import com.google.common.io.Files +import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf @@ -40,18 +41,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before private var rootDir1: File = _ private var rootDirs: String = _ - // This suite focuses primarily on consolidation features, - // so we coerce consolidation if not already enabled. - testConf.set("spark.shuffle.consolidateFiles", "true") - - private val shuffleManager = new HashShuffleManager(testConf.clone) - - val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) { - override def conf = testConf.clone - var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() - override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) - } - + val blockManager = mock(classOf[BlockManager]) + when(blockManager.conf).thenReturn(testConf) var diskBlockManager: DiskBlockManager = _ override def beforeAll() { @@ -73,22 +64,17 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before override def beforeEach() { val conf = testConf.clone conf.set("spark.local.dir", rootDirs) - diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - shuffleBlockManager.idToSegmentMap.clear() + diskBlockManager = new DiskBlockManager(blockManager, conf) } override def afterEach() { diskBlockManager.stop() - shuffleBlockManager.idToSegmentMap.clear() } test("basic block creation") { val blockId = new TestBlockId("test") - assertSegmentEquals(blockId, blockId.name, 0, 0) - val newFile = diskBlockManager.getFile(blockId) writeToFile(newFile, 10) - assertSegmentEquals(blockId, blockId.name, 0, 10) assert(diskBlockManager.containsBlock(blockId)) newFile.delete() assert(!diskBlockManager.containsBlock(blockId)) @@ -101,127 +87,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before assert(diskBlockManager.getAllBlocks.toSet === ids.toSet) } - test("block appending") { - val blockId = new TestBlockId("test") - val newFile = diskBlockManager.getFile(blockId) - writeToFile(newFile, 15) - assertSegmentEquals(blockId, blockId.name, 0, 15) - val newFile2 = diskBlockManager.getFile(blockId) - assert(newFile === newFile2) - writeToFile(newFile2, 12) - assertSegmentEquals(blockId, blockId.name, 0, 27) - newFile.delete() - } - - test("block remapping") { - val filename = "test" - val blockId0 = new ShuffleBlockId(1, 2, 3) - val newFile = diskBlockManager.getFile(filename) - writeToFile(newFile, 15) - shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) - assertSegmentEquals(blockId0, filename, 0, 15) - - val blockId1 = new ShuffleBlockId(1, 2, 4) - val newFile2 = diskBlockManager.getFile(filename) - writeToFile(newFile2, 12) - shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) - assertSegmentEquals(blockId1, filename, 15, 12) - - assert(newFile === newFile2) - newFile.delete() - } - - private def checkSegments(segment1: FileSegment, segment2: FileSegment) { - assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) - assert (segment1.offset === segment2.offset) - assert (segment1.length === segment2.length) - } - - test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { - - val serializer = new JavaSerializer(testConf) - val confCopy = testConf.clone - // reset after EACH object write. This is to ensure that there are bytes appended after - // an object is written. So if the codepaths assume writeObject is end of data, this should - // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. - confCopy.set("spark.serializer.objectStreamReset", "1") - - val securityManager = new org.apache.spark.SecurityManager(confCopy) - // Do not use the shuffleBlockManager above ! - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy, - securityManager) - val master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), - confCopy) - val store = new BlockManager("", actorSystem, master , serializer, confCopy, - securityManager, null, shuffleManager) - - try { - - val shuffleManager = store.shuffleBlockManager - - val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer, new ShuffleWriteMetrics) - for (writer <- shuffle1.writers) { - writer.write("test1") - writer.write("test2") - } - for (writer <- shuffle1.writers) { - writer.commitAndClose() - } - - val shuffle1Segment = shuffle1.writers(0).fileSegment() - shuffle1.releaseWriters(success = true) - - val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - - for (writer <- shuffle2.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle2.writers) { - writer.commitAndClose() - } - val shuffle2Segment = shuffle2.writers(0).fileSegment() - shuffle2.releaseWriters(success = true) - - // Now comes the test : - // Write to shuffle 3; and close it, but before registering it, check if the file lengths for - // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length - // of block based on remaining data in file : which could mess things up when there is concurrent read - // and writes happening to the same shuffle group. - - val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - for (writer <- shuffle3.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle3.writers) { - writer.commitAndClose() - } - // check before we register. - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffleManager.removeShuffle(1) - } finally { - - if (store != null) { - store.stop() - } - actorSystem.shutdown() - actorSystem.awaitTermination() - } - } - - def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { - val segment = diskBlockManager.getBlockLocation(blockId) - assert(segment.file.getName === filename) - assert(segment.offset === offset) - assert(segment.length === length) - } - def writeToFile(file: File, numBytes: Int) { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 7549fbbe66654..281e8d4de6d71 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,6 +118,7 @@ make_binary_release() { } make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & +make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Pyarn" & make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Pyarn" & make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" & diff --git a/docs/README.md b/docs/README.md index fd7ba4e0d72ea..0a0126c5747d1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -30,7 +30,7 @@ called `_site` containing index.html as well as the rest of the compiled files. You can modify the default Jekyll build as follows: # Skip generating API docs (which takes a while) - $ SKIP_SCALADOC=1 jekyll build + $ SKIP_API=1 jekyll build # Serve content locally on port 4000 $ jekyll serve --watch # Build the site with extra features used on the live page diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c41f2804a6021..8f7fb5431cfb6 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -474,10 +474,10 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -In order to use Hive you must first run '`sbt/sbt -Phive assembly/assembly`' (or use `-Phive` for maven). +In order to use Hive you must first run "`sbt/sbt -Phive assembly/assembly`" (or use `-Phive` for maven). This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries -(SerDes) in order to acccess data stored in Hive. +(SerDes) in order to access data stored in Hive. Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. @@ -576,9 +576,8 @@ evaluated by the SQL execution engine. A full list of the functions supported c ## Running the Thrift JDBC server -The Thrift JDBC server implemented here corresponds to the [`HiveServer2`] -(https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test -the JDBC server with the beeline script comes with either Spark or Hive 0.12. +The Thrift JDBC server implemented here corresponds to the [`HiveServer2`](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) +in Hive 0.12. You can test the JDBC server with the beeline script comes with either Spark or Hive 0.12. To start the JDBC server, run the following in the Spark directory: @@ -597,7 +596,7 @@ Connect to the JDBC server in beeline with: Beeline will ask you for a username and password. In non-secure mode, simply enter the username on your machine and a blank password. For secure mode, please follow the instructions given in the -[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients) +[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients). Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. @@ -616,11 +615,10 @@ In Shark, default reducer number is 1 and is controlled by the property `mapred. SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value is 200. Users may customize this property via `SET`: -``` -SET spark.sql.shuffle.partitions=10; -SELECT page, count(*) c FROM logs_last_month_cached -GROUP BY page ORDER BY c DESC LIMIT 10; -``` + SET spark.sql.shuffle.partitions=10; + SELECT page, count(*) c + FROM logs_last_month_cached + GROUP BY page ORDER BY c DESC LIMIT 10; You may also put this property in `hive-site.xml` to override the default value. @@ -630,22 +628,18 @@ For now, the `mapred.reduce.tasks` property is still recognized, and is converte #### Caching The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no -longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to +longer automatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to let user control table caching explicitly: -``` -CACHE TABLE logs_last_month; -UNCACHE TABLE logs_last_month; -``` + CACHE TABLE logs_last_month; + UNCACHE TABLE logs_last_month; -**NOTE** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", +**NOTE:** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be cached, you may simply count the table immediately after executing `CACHE TABLE`: -``` -CACHE TABLE logs_last_month; -SELECT COUNT(1) FROM logs_last_month; -``` + CACHE TABLE logs_last_month; + SELECT COUNT(1) FROM logs_last_month; Several caching related features are not supported yet: @@ -655,7 +649,7 @@ Several caching related features are not supported yet: ### Compatibility with Apache Hive -#### Deploying in Exising Hive Warehouses +#### Deploying in Existing Hive Warehouses Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement @@ -666,50 +660,50 @@ or partitioning of your tables. Spark SQL supports the vast majority of Hive features, such as: * Hive query statements, including: - * `SELECT` - * `GROUP BY - * `ORDER BY` - * `CLUSTER BY` - * `SORT BY` + * `SELECT` + * `GROUP BY` + * `ORDER BY` + * `CLUSTER BY` + * `SORT BY` * All Hive operators, including: - * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc) - * Arthimatic operators (`+`, `-`, `*`, `/`, `%`, etc) - * Logical operators (`AND`, `&&`, `OR`, `||`, etc) - * Complex type constructors - * Mathemtatical functions (`sign`, `ln`, `cos`, etc) - * String functions (`instr`, `length`, `printf`, etc) + * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc) + * Arithmetic operators (`+`, `-`, `*`, `/`, `%`, etc) + * Logical operators (`AND`, `&&`, `OR`, `||`, etc) + * Complex type constructors + * Mathematical functions (`sign`, `ln`, `cos`, etc) + * String functions (`instr`, `length`, `printf`, etc) * User defined functions (UDF) * User defined aggregation functions (UDAF) -* User defined serialization formats (SerDe's) +* User defined serialization formats (SerDes) * Joins - * `JOIN` - * `{LEFT|RIGHT|FULL} OUTER JOIN` - * `LEFT SEMI JOIN` - * `CROSS JOIN` + * `JOIN` + * `{LEFT|RIGHT|FULL} OUTER JOIN` + * `LEFT SEMI JOIN` + * `CROSS JOIN` * Unions -* Sub queries - * `SELECT col FROM ( SELECT a + b AS col from t1) t2` +* Sub-queries + * `SELECT col FROM ( SELECT a + b AS col from t1) t2` * Sampling * Explain * Partitioned tables * All Hive DDL Functions, including: - * `CREATE TABLE` - * `CREATE TABLE AS SELECT` - * `ALTER TABLE` + * `CREATE TABLE` + * `CREATE TABLE AS SELECT` + * `ALTER TABLE` * Most Hive Data types, including: - * `TINYINT` - * `SMALLINT` - * `INT` - * `BIGINT` - * `BOOLEAN` - * `FLOAT` - * `DOUBLE` - * `STRING` - * `BINARY` - * `TIMESTAMP` - * `ARRAY<>` - * `MAP<>` - * `STRUCT<>` + * `TINYINT` + * `SMALLINT` + * `INT` + * `BIGINT` + * `BOOLEAN` + * `FLOAT` + * `DOUBLE` + * `STRING` + * `BINARY` + * `TIMESTAMP` + * `ARRAY<>` + * `MAP<>` + * `STRUCT<>` #### Unsupported Hive Functionality @@ -749,8 +743,7 @@ releases of Spark SQL. Hive automatically converts the join into a map join. We are adding this auto conversion in the next release. * Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you - need to control the degree of parallelism post-shuffle using "SET - spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the + need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`". We are going to add auto-setting of parallelism in the next release. * Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result. diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index f96bc1bf00b92..89dfa26c2299c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -27,7 +27,7 @@ import org.apache.spark.mllib.linalg.Vectors /** * An example k-means app. Run with * {{{ - * ./bin/spark-example org.apache.spark.examples.mllib.DenseKMeans [options] + * ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala index 88acd9dbb0878..952fa2a5109a4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala @@ -27,7 +27,7 @@ import org.apache.spark.mllib.util.MLUtils /** * An example naive Bayes app. Run with * {{{ - * ./bin/spark-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] + * ./bin/run-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ diff --git a/make-distribution.sh b/make-distribution.sh index f7a6a9d838bb6..ee1399071112d 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -113,7 +113,17 @@ if ! which mvn &>/dev/null; then echo -e "Download Maven from https://maven.apache.org/" exit -1; fi + VERSION=$(mvn help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1) +SPARK_HADOOP_VERSION=$(mvn help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\ + | grep -v "INFO"\ + | tail -n 1) +SPARK_HIVE=$(mvn help:evaluate -Dexpression=project.activeProfiles $@ 2>/dev/null\ + | grep -v "INFO"\ + | fgrep --count "hive";\ + # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\ + # because we use "set -o pipefail" + echo -n) JAVA_CMD="$JAVA_HOME"/bin/java JAVA_VERSION=$("$JAVA_CMD" -version 2>&1) @@ -175,7 +185,7 @@ cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" mkdir -p "$DISTDIR/examples/src/main" cp -r "$FWDIR"/examples/src/main "$DISTDIR/examples/src/" -if [ "$SPARK_HIVE" == "true" ]; then +if [ "$SPARK_HIVE" == "1" ]; then cp "$FWDIR"/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/" fi diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 300589394b96f..fe8ffe6d97a05 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -58,6 +58,8 @@ object MimaExcludes { "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.DiskStore.getValues"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.storage.MemoryStore.Entry") ) ++ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a90870ed3a353..82f76de31afc1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -568,8 +568,6 @@ def broadcast(self, value): L{Broadcast} object for reading it in distributed functions. The variable will be sent to each cluster only once. - - :keep: Keep the `value` in driver or not. """ ser = CompressedSerializer(PickleSerializer()) # pass large object by py4j is very slow and need much memory diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 2c73a80f64ebf..a88bd859fc85e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -114,6 +114,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") + protected val TIMESTAMP = Keyword("TIMESTAMP") protected val TRUE = Keyword("TRUE") protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") @@ -122,6 +123,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val EXCEPT = Keyword("EXCEPT") protected val SUBSTR = Keyword("SUBSTR") protected val SUBSTRING = Keyword("SUBSTRING") + protected val SQRT = Keyword("SQRT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -323,6 +325,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { (SUBSTR | SUBSTRING) ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l) } | + SQRT ~> "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ { case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs) } @@ -357,7 +360,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { literal protected lazy val dataType: Parser[DataType] = - STRING ^^^ StringType + STRING ^^^ StringType | TIMESTAMP ^^^ TimestampType } class SqlLexical(val keywords: Seq[String]) extends StdLexical { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 15eb5982a4a91..d6758eb5b6a32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -218,15 +218,27 @@ trait HiveTypeCoercion { case a: BinaryArithmetic if a.right.dataType == StringType => a.makeCopy(Array(a.left, Cast(a.right, DoubleType))) + case p: BinaryPredicate if p.left.dataType == StringType + && p.right.dataType == TimestampType => + p.makeCopy(Array(Cast(p.left, TimestampType), p.right)) + case p: BinaryPredicate if p.left.dataType == TimestampType + && p.right.dataType == StringType => + p.makeCopy(Array(p.left, Cast(p.right, TimestampType))) + case p: BinaryPredicate if p.left.dataType == StringType && p.right.dataType != StringType => p.makeCopy(Array(Cast(p.left, DoubleType), p.right)) case p: BinaryPredicate if p.left.dataType != StringType && p.right.dataType == StringType => p.makeCopy(Array(p.left, Cast(p.right, DoubleType))) + case i @ In(a,b) if a.dataType == TimestampType && b.forall(_.dataType == StringType) => + i.makeCopy(Array(a,b.map(Cast(_,TimestampType)))) + case Sum(e) if e.dataType == StringType => Sum(Cast(e, DoubleType)) case Average(e) if e.dataType == StringType => Average(Cast(e, DoubleType)) + case Sqrt(e) if e.dataType == StringType => + Sqrt(Cast(e, DoubleType)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index aae86a3628be1..56f042891a2e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -33,6 +33,19 @@ case class UnaryMinus(child: Expression) extends UnaryExpression { } } +case class Sqrt(child: Expression) extends UnaryExpression { + type EvaluatedType = Any + + def dataType = child.dataType + override def foldable = child.foldable + def nullable = child.nullable + override def toString = s"SQRT($child)" + + override def eval(input: Row): Any = { + n1(child, input, ((na,a) => math.sqrt(na.toDouble(a)))) + } +} + abstract class BinaryArithmetic extends BinaryExpression { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 90923fe31a063..f0fd9a8b9a46e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.planning import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -134,8 +135,8 @@ object PartialAggregation { // Only do partial aggregation if supported by all aggregate expressions. if (allAggregates.size == partialAggregates.size) { // Create a map of expressions to their partial evaluations for all aggregate expressions. - val partialEvaluations: Map[Long, SplitEvaluation] = - partialAggregates.map(a => (a.id, a.asPartial)).toMap + val partialEvaluations: Map[TreeNodeRef, SplitEvaluation] = + partialAggregates.map(a => (new TreeNodeRef(a), a.asPartial)).toMap // We need to pass all grouping expressions though so the grouping can happen a second // time. However some of them might be unnamed so we alias them allowing them to be @@ -148,8 +149,8 @@ object PartialAggregation { // Replace aggregations with a new expression that computes the result from the already // computed partial evaluations and grouping values. val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp { - case e: Expression if partialEvaluations.contains(e.id) => - partialEvaluations(e.id).finalEvaluation + case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) => + partialEvaluations(new TreeNodeRef(e)).finalEvaluation case e: Expression if namedGroupingExpressions.contains(e) => namedGroupingExpressions(e).toAttribute }).asInstanceOf[Seq[NamedExpression]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 1e177e28f80b3..af9e4d86e995a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -50,11 +50,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy @inline def transformExpressionDown(e: Expression) = { val newE = e.transformDown(rule) - if (newE.id != e.id && newE != e) { + if (newE.fastEquals(e)) { + e + } else { changed = true newE - } else { - e } } @@ -82,11 +82,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy @inline def transformExpressionUp(e: Expression) = { val newE = e.transformUp(rule) - if (newE.id != e.id && newE != e) { + if (newE.fastEquals(e)) { + e + } else { changed = true newE - } else { - e } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 96ce35939e2cc..2013ae4f7bd13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -19,11 +19,6 @@ package org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.errors._ -object TreeNode { - private val currentId = new java.util.concurrent.atomic.AtomicLong - protected def nextId() = currentId.getAndIncrement() -} - /** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */ private class MutableInt(var i: Int) @@ -33,29 +28,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { /** Returns a Seq of the children of this node */ def children: Seq[BaseType] - /** - * A globally unique id for this specific instance. Not preserved across copies. - * Unlike `equals`, `id` can be used to differentiate distinct but structurally - * identical branches of a tree. - */ - val id = TreeNode.nextId() - - /** - * Returns true if other is the same [[catalyst.trees.TreeNode TreeNode]] instance. Unlike - * `equals` this function will return false for different instances of structurally identical - * trees. - */ - def sameInstance(other: TreeNode[_]): Boolean = { - this.id == other.id - } - /** * Faster version of equality which short-circuits when two treeNodes are the same instance. * We don't just override Object.Equals, as doing so prevents the scala compiler from from * generating case class `equals` methods */ def fastEquals(other: TreeNode[_]): Boolean = { - sameInstance(other) || this == other + this.eq(other) || this == other } /** @@ -393,3 +372,4 @@ trait UnaryNode[BaseType <: TreeNode[BaseType]] { def child: BaseType def children = child :: Nil } + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala index d725a92c06f7b..79a8e06d4b4d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala @@ -37,4 +37,15 @@ package object trees extends Logging { // Since we want tree nodes to be lightweight, we create one logger for all treenode instances. protected override def logName = "catalyst.trees" + /** + * A [[TreeNode]] companion for reference equality for Hash based Collection. + */ + class TreeNodeRef(val obj: TreeNode[_]) { + override def equals(o: Any) = o match { + case that: TreeNodeRef => that.obj.eq(obj) + case _ => false + } + + override def hashCode = if (obj == null) 0 else obj.hashCode + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index f1df817c41362..b961346dfc995 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -577,4 +577,17 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(s.substring(0, 2), "ex", row) checkEvaluation(s.substring(0), "example", row) } + + test("SQRT") { + val inputSequence = (1 to (1<<24) by 511).map(_ * (1L<<24)) + val expectedResults = inputSequence.map(l => math.sqrt(l.toDouble)) + val rowSequence = inputSequence.map(l => new GenericRow(Array[Any](l.toDouble))) + val d = 'a.double.at(0) + + for ((row, expected) <- rowSequence zip expectedResults) { + checkEvaluation(Sqrt(d), expected, row) + } + + checkEvaluation(Sqrt(Literal(null, DoubleType)), null, new GenericRow(Array[Any](null))) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 296202543e2ca..036fd3fa1d6a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -51,7 +51,10 @@ class TreeNodeSuite extends FunSuite { val after = before transform { case Literal(5, _) => Literal(1)} assert(before === after) - assert(before.map(_.id) === after.map(_.id)) + // Ensure that the objects after are the same objects before the transformation. + before.map(identity[Expression]).zip(after.map(identity[Expression])).foreach { + case (b, a) => assert(b eq a) + } } test("collect") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bacd00b13..cb055cd74a5e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator => // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = - if (attributes.isEmpty) { - Seq(0) - } else { - attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) - } - - new Iterator[Row] { - private[this] var columnBuffers: Array[ByteBuffer] = null - private[this] var columnAccessors: Seq[ColumnAccessor] = null - nextBatch() - - private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - - def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) - } + val requestedColumns = if (attributes.isEmpty) { + Seq(0) + } else { + attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) + } - override def next() = { - if (!columnAccessors.head.hasNext) { - nextBatch() - } + iterator + .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_))) + .flatMap { columnAccessors => + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } - var i = 0 - while (i < nextRow.length) { - columnAccessors(i).extractTo(nextRow, i) - i += 1 + override def hasNext = columnAccessors.head.hasNext } - nextRow } - - override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 31ad5e8aabb0e..b3edd5020fa8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.trees._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.types._ @@ -141,9 +142,10 @@ case class GeneratedAggregate( val computationSchema = computeFunctions.flatMap(_.schema) - val resultMap: Map[Long, Expression] = aggregatesToCompute.zip(computeFunctions).map { - case (agg, func) => agg.id -> func.result - }.toMap + val resultMap: Map[TreeNodeRef, Expression] = + aggregatesToCompute.zip(computeFunctions).map { + case (agg, func) => new TreeNodeRef(agg) -> func.result + }.toMap val namedGroups = groupingExpressions.zipWithIndex.map { case (ne: NamedExpression, _) => (ne, ne) @@ -156,7 +158,7 @@ case class GeneratedAggregate( // The set of expressions that produce the final output given the aggregation buffer and the // grouping expressions. val resultExpressions = aggregateExpressions.map(_.transform { - case e: Expression if resultMap.contains(e.id) => resultMap(e.id) + case e: Expression if resultMap.contains(new TreeNodeRef(e)) => resultMap(new TreeNodeRef(e)) case e: Expression if groupMap.contains(e) => groupMap(e) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5b896c55b7393..8ff757bbe3508 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -23,6 +23,7 @@ import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.sql.catalyst.trees.TreeNodeRef /** * :: DeveloperApi :: @@ -43,10 +44,10 @@ package object debug { implicit class DebugQuery(query: SchemaRDD) { def debug(): Unit = { val plan = query.queryExecution.executedPlan - val visited = new collection.mutable.HashSet[Long]() + val visited = new collection.mutable.HashSet[TreeNodeRef]() val debugPlan = plan transform { - case s: SparkPlan if !visited.contains(s.id) => - visited += s.id + case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) => + visited += new TreeNodeRef(s) DebugNode(s) } println(s"Results returned: ${debugPlan.execute().count()}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index aef6ebf86b1eb..3dc8be2456781 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -98,7 +98,7 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] { logical.Project( l.output, l.transformExpressions { - case p: PythonUDF if p.id == udf.id => evaluation.resultAttribute + case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute }.withNewChildren(newChildren)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9b2a36d33fca7..1ac205937714c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,21 +19,48 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test._ +import org.scalatest.BeforeAndAfterAll +import java.util.TimeZone /* Implicits */ import TestSQLContext._ import TestData._ -class SQLQuerySuite extends QueryTest { +class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. TestData + var origZone: TimeZone = _ + override protected def beforeAll() { + origZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + } + + override protected def afterAll() { + TimeZone.setDefault(origZone) + } + + test("SPARK-2041 column name equals tablename") { checkAnswer( sql("SELECT tableName FROM tableName"), "test") } + test("SQRT") { + checkAnswer( + sql("SELECT SQRT(key) FROM testData"), + (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq + ) + } + + test("SQRT with automatic string casts") { + checkAnswer( + sql("SELECT SQRT(CAST(key AS STRING)) FROM testData"), + (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq + ) + } + test("SPARK-2407 Added Parser of SQL SUBSTR()") { checkAnswer( sql("SELECT substr(tableName, 1, 2) FROM tableName"), @@ -49,6 +76,34 @@ class SQLQuerySuite extends QueryTest { "st") } + test("SPARK-3173 Timestamp support in the parser") { + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time=CAST('1970-01-01 00:00:00.001' AS TIMESTAMP)"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time='1970-01-01 00:00:00.001'"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE '1970-01-01 00:00:00.001'=time"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + """SELECT time FROM timestamps WHERE time<'1970-01-01 00:00:00.003' + AND time>'1970-01-01 00:00:00.001'"""), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time IN ('1970-01-01 00:00:00.001','1970-01-01 00:00:00.002')"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")), + Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time='123'"), + Nil) + } + test("index into array") { checkAnswer( sql("SELECT data, data[0], data[0] + data[1], data[0 + 1] FROM arrayData"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c3ec82fb69778..eb33a61c6e811 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -151,4 +151,9 @@ object TestData { TimestampField(new Timestamp(i)) }) timestamps.registerTempTable("timestamps") + + case class IntField(i: Int) + // An RDD with 4 elements and 8 partitions + val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + withEmptyParts.registerTempTable("withEmptyParts") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index fdd2799a53268..0e3c67f5eed29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.columnar -import org.apache.spark.sql.{QueryTest, TestData} import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.SparkLogicalPlan import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{SQLConf, QueryTest, TestData} class InMemoryColumnarQuerySuite extends QueryTest { - import TestData._ - import TestSQLContext._ + import org.apache.spark.sql.TestData._ + import org.apache.spark.sql.test.TestSQLContext._ test("simple columnar query") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan @@ -93,4 +92,16 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT time FROM timestamps"), timestamps.collect().toSeq) } + + test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq) + + TestSQLContext.cacheTable("withEmptyParts") + + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq) + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 6eccb1ba6d4dc..f12b5a69a09f7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -66,9 +66,10 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage if (!iter.hasNext) { new RowSet() } else { - val maxRows = maxRowsL.toInt // Do you really want a row batch larger than Int Max? No. + // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int + val maxRows = maxRowsL.toInt var curRow = 0 - var rowSet = new ArrayBuffer[Row](maxRows) + var rowSet = new ArrayBuffer[Row](maxRows.min(1024)) while (curRow < maxRows && iter.hasNext) { val sparkRow = iter.next() diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 035fd3214bd1d..b589994bd25fa 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -22,6 +22,7 @@ import java.util.{Locale, TimeZone} import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.test.TestHive /** @@ -29,29 +30,31 @@ import org.apache.spark.sql.hive.test.TestHive */ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // TODO: bundle in jar files... get from classpath - lazy val hiveQueryDir = TestHive.getHiveFile("ql" + File.separator + "src" + - File.separator + "test" + File.separator + "queries" + File.separator + "clientpositive") + private lazy val hiveQueryDir = TestHive.getHiveFile( + "ql/src/test/queries/clientpositive".split("/").mkString(File.separator)) - var originalTimeZone: TimeZone = _ - var originalLocale: Locale = _ + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + private val originalUseCompression = TestHive.useCompression def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) override def beforeAll() { + // Enable in-memory columnar caching TestHive.cacheTables = true // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) - originalTimeZone = TimeZone.getDefault TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) - // Add Locale setting - originalLocale = Locale.getDefault Locale.setDefault(Locale.US) + // Enable in-memory columnar compression + TestHive.setConf(SQLConf.COMPRESS_CACHED, "true") } override def afterAll() { TestHive.cacheTables = false TimeZone.setDefault(originalTimeZone) Locale.setDefault(originalLocale) + TestHive.setConf(SQLConf.COMPRESS_CACHED, originalUseCompression.toString) } /** A list of tests deemed out of scope currently and thus completely disregarded. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index fa3adfdf5855c..a4dd6be5f9e35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -889,6 +889,7 @@ private[hive] object HiveQl { val WHEN = "(?i)WHEN".r val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r + val SQRT = "(?i)SQRT".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -958,6 +959,7 @@ private[hive] object HiveQl { case Token(DIV(), left :: right:: Nil) => Cast(Divide(nodeToExpr(left), nodeToExpr(right)), LongType) case Token("%", left :: right:: Nil) => Remainder(nodeToExpr(left), nodeToExpr(right)) + case Token("TOK_FUNCTION", Token(SQRT(), Nil) :: arg :: Nil) => Sqrt(nodeToExpr(arg)) /* Comparisons */ case Token("=", left :: right:: Nil) => EqualTo(nodeToExpr(left), nodeToExpr(right)) diff --git a/sql/hive/src/test/resources/golden/case sensitivity: Hive table-0-5d14d21a239daa42b086cc895215009a b/sql/hive/src/test/resources/golden/case sensitivity when query Hive table-0-5d14d21a239daa42b086cc895215009a similarity index 100% rename from sql/hive/src/test/resources/golden/case sensitivity: Hive table-0-5d14d21a239daa42b086cc895215009a rename to sql/hive/src/test/resources/golden/case sensitivity when query Hive table-0-5d14d21a239daa42b086cc895215009a diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 502ce8fb297e9..671c3b162f875 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -195,6 +195,9 @@ abstract class HiveComparisonTest val installHooksCommand = "(?i)SET.*hooks".r def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { + // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows + assert(!testCaseName.contains(":")) + // If test sharding is enable, skip tests that are not in the correct shard. shardInfo.foreach { case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 6d925e56e6838..c4abb3eb4861f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -309,7 +309,7 @@ class HiveQuerySuite extends HiveComparisonTest { } } - createQueryTest("case sensitivity: Hive table", + createQueryTest("case sensitivity when query Hive table", "SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15") test("case sensitivity: registered table") { diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 17bf7c2541d13..db58eb642b56d 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -20,10 +20,11 @@ package org.apache.spark.tools import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util.Utils -import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -50,13 +51,15 @@ object StoragePerfTester { System.setProperty("spark.shuffle.compress", "false") System.setProperty("spark.shuffle.sync", "true") + System.setProperty("spark.shuffle.manager", + "org.apache.spark.shuffle.hash.HashShuffleManager") // This is only used to instantiate a BlockManager. All thread scheduling is done manually. val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager + val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) {