From eadde561ed26ec136273eab928b84fedc4c5fd3a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Oct 2014 12:54:02 -0700 Subject: [PATCH 01/11] Transferred HDFSBackedBlockRDD for the driver-ha-working branch --- .../scala/org/apache/spark/rdd/BlockRDD.scala | 4 + .../streaming/rdd/HDFSBackedBlockRDD.scala | 92 +++++++++++ .../rdd/HDFSBackedBlockRDDSuite.scala | 152 ++++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 2673ec22509e9..fffa1911f5bc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds "Attempted to use %s after its blocks have been removed!".format(toString)) } } + + protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { + locations_ + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala new file mode 100644 index 0000000000000..dec2f1b56b2da --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( + val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition { + val index = idx +} + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( + @transient sc: SparkContext, + @transient hadoopConfiguration: Configuration, + @transient override val blockIds: Array[BlockId], + @transient val segments: Array[WriteAheadLogFileSegment], + val storeInBlockManager: Boolean, + val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + if (blockIds.length != segments.length) { + throw new IllegalStateException("Number of block ids must be the same as number of segments!") + } + + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]] + override def getPartitions: Array[Partition] = { + assertValid() + (0 until blockIds.size).map { i => + new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) + }.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + assertValid() + val hadoopConf = broadcastedHadoopConf.value.value + val blockManager = SparkEnv.get.blockManager + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val blockId = partition.blockId + blockManager.get(blockId) match { + // Data is in Block Manager, grab it from there. + case Some(block) => + block.data.asInstanceOf[Iterator[T]] + // Data not found in Block Manager, grab it from HDFS + case None => + logInfo("Reading partition data from write ahead log " + partition.segment.path) + val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) + val dataRead = reader.read(partition.segment) + reader.close() + // Currently, we support storing the data to BM only in serialized form and not in + // deserialized form + if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) + } + dataRead.rewind() + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + } + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val locations = getBlockIdLocations() + locations.getOrElse(partition.blockId, + HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) + .getOrElse(new Array[String](0)).toSeq) + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala new file mode 100644 index 0000000000000..9e1ad312e52fb --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} +import org.apache.spark.{SparkConf, SparkContext} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName(this.getClass.getSimpleName) + val sparkContext = new SparkContext(conf) + val hadoopConf = new Configuration() + val blockManager = sparkContext.env.blockManager + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + var file: File = null + var dir: File = null + + before { + dir = Files.createTempDir() + file = new File(dir, "BlockManagerWrite") + } + + after { + file.delete() + dir.delete() + } + + test("Data available in BM and HDFS") { + doTestHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { + doTestHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { + doTestHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { + doTestHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total - Total number of Strings to write + * @param blockCount - Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def doTestHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int + ) { + val countPerBlock = total / blockCount + val blockIds = (0 until blockCount).map { + i => + StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) + } + + val writtenStrings = generateData(total, countPerBlock) + + if (writeToBMCount != 0) { + (0 until writeToBMCount).foreach { i => + blockManager + .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) + } + } + + val segments = new ArrayBuffer[WriteAheadLogFileSegment] + if (writeToHDFSCount != 0) { + // Generate some fake segments for the blocks in BM so the RDD does not complain + segments ++= generateFakeSegments(writeToBMCount) + segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), + blockIds.slice(writeToBMCount, blockCount)) + + } else { + segments ++= generateFakeSegments(blockCount) + } + val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, + segments.toArray, false, StorageLevel.MEMORY_ONLY) + + val dataFromRDD = rdd.collect() + // verify each partition is equal to the data pulled out + assert(writtenStrings.flatten === dataFromRDD) + } + + /** + * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that + * went into one block. + * @param count - Number of Strings to write + * @param countPerBlock - Number of Strings per block + * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments, + * each representing the block being written to HDFS. + */ + private def generateData( + count: Int, + countPerBlock: Int + ): Seq[Seq[String]] = { + val strings = (0 until count).map { _ => scala.util.Random.nextString(50)} + strings.grouped(countPerBlock).toSeq + } + + private def writeDataToHDFS( + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] + ): Seq[WriteAheadLogFileSegment] = { + assert(blockData.size === blockIds.size) + val segments = new ArrayBuffer[WriteAheadLogFileSegment]() + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + blockData.zip(blockIds).foreach { + case (data, id) => + segments += writer.write(blockManager.dataSerialize(id, data.iterator)) + } + writer.close() + segments + } + + private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = { + (0 until count).map { + _ => new WriteAheadLogFileSegment("random", 0l, 0) + } + } +} From 5cce16fc667fa757de3aa76b2986ba174fbc88e1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 24 Oct 2014 18:07:11 -0700 Subject: [PATCH 02/11] Make sure getBlockLocations uses offset and length to find the blocks on HDFS --- .../org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala | 3 ++- .../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala index dec2f1b56b2da..20b3992083ae6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -86,7 +86,8 @@ class HDFSBackedBlockRDD[T: ClassTag]( val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] val locations = getBlockIdLocations() locations.getOrElse(partition.blockId, - HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) + HdfsUtils.getBlockLocations(partition.segment.path, partition.segment.offset, + partition.segment.length, hadoopConfiguration) .getOrElse(new Array[String](0)).toSeq) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 491f1175576e6..ece3c876d0716 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -52,11 +52,12 @@ private[streaming] object HdfsUtils { } } - def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { + def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): + Option[Array[String]] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) - val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) + val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length)) blockLocs.map(_.flatMap(_.getHosts)) } From 2878c38de3f2f0ba70a6c96d918f42ffbac774a6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 24 Oct 2014 23:43:00 -0700 Subject: [PATCH 03/11] Shutdown spark context after tests. Formatting/minor fixes --- .../streaming/rdd/HDFSBackedBlockRDD.scala | 15 +++--- .../rdd/HDFSBackedBlockRDDSuite.scala | 51 +++++++++++-------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala index 20b3992083ae6..c893c800181a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -20,7 +20,6 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} @@ -28,9 +27,10 @@ import org.apache.spark._ private[streaming] class HDFSBackedBlockRDDPartition( - val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition { - val index = idx -} + val blockId: BlockId, + val index: Int, + val segment: WriteAheadLogFileSegment + ) extends Partition private[streaming] class HDFSBackedBlockRDD[T: ClassTag]( @@ -42,13 +42,12 @@ class HDFSBackedBlockRDD[T: ClassTag]( val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { - if (blockIds.length != segments.length) { - throw new IllegalStateException("Number of block ids must be the same as number of segments!") - } + require(blockIds.length == segments.length, + "Number of block ids must be the same as number of segments!") // Hadoop Configuration is not serializable, so broadcast it as a serializable. val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]] + override def getPartitions: Array[Partition] = { assertValid() (0 until blockIds.size).map { i => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala index 9e1ad312e52fb..7bada0c83f0a3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala @@ -19,17 +19,18 @@ package org.apache.spark.streaming.rdd import java.io.File import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.{SparkConf, SparkContext} + import scala.collection.mutable.ArrayBuffer -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} -import org.apache.spark.{SparkConf, SparkContext} -class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) @@ -51,6 +52,13 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { dir.delete() } + override def afterAll(): Unit = { + // Copied from LocalSparkContext which can't be imported since spark-core test-jar does not + // get imported properly by sbt even if it is created. + sparkContext.stop() + System.clearProperty("spark.driver.port") + } + test("Data available in BM and HDFS") { doTestHDFSBackedRDD(5, 5, 20, 5) } @@ -70,8 +78,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { /** * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the * BlockManager, so all reads need not happen from HDFS. - * @param total - Total number of Strings to write - * @param blockCount - Number of blocks to write (therefore, total # of events per block = + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = * total/blockCount */ private def doTestHDFSBackedRDD( @@ -81,8 +89,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { blockCount: Int ) { val countPerBlock = total / blockCount - val blockIds = (0 until blockCount).map { - i => + val blockIds = (0 until blockCount).map { i => StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) } @@ -95,16 +102,17 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { } } - val segments = new ArrayBuffer[WriteAheadLogFileSegment] - if (writeToHDFSCount != 0) { - // Generate some fake segments for the blocks in BM so the RDD does not complain - segments ++= generateFakeSegments(writeToBMCount) - segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), - blockIds.slice(writeToBMCount, blockCount)) - - } else { - segments ++= generateFakeSegments(blockCount) + val segments = { + if (writeToHDFSCount != 0) { + // Generate some fake segments for the blocks in BM so the RDD does not complain + generateFakeSegments(writeToBMCount) ++ + writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), + blockIds.slice(writeToBMCount, blockCount)) + } else { + generateFakeSegments(blockCount) + } } + val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, segments.toArray, false, StorageLevel.MEMORY_ONLY) @@ -116,10 +124,9 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { /** * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that * went into one block. - * @param count - Number of Strings to write - * @param countPerBlock - Number of Strings per block - * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments, - * each representing the block being written to HDFS. + * @param count Number of Strings to write + * @param countPerBlock Number of Strings per block + * @return Seq of Seqs, each of these Seqs is one block */ private def generateData( count: Int, @@ -130,8 +137,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { } private def writeDataToHDFS( - blockData: Seq[Seq[String]], - blockIds: Seq[BlockId] + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] ): Seq[WriteAheadLogFileSegment] = { assert(blockData.size === blockIds.size) val segments = new ArrayBuffer[WriteAheadLogFileSegment]() From 6e1bfb8dae4eb829c59dd6baf423b93bd4d2971f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 27 Oct 2014 12:17:53 -0700 Subject: [PATCH 04/11] Tweaks testuite to create spark contxt lazily to prevent contxt leaks. --- .../rdd/HDFSBackedBlockRDDSuite.scala | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala index 7bada0c83f0a3..70897e4c0a8cf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala @@ -27,22 +27,24 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} import com.google.common.io.Files import org.apache.hadoop.conf.Configuration -import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) - val sparkContext = new SparkContext(conf) val hadoopConf = new Configuration() - val blockManager = sparkContext.env.blockManager // Since the same BM is reused in all tests, use an atomic int to generate ids val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null var file: File = null var dir: File = null before { + blockManager = sparkContext.env.blockManager dir = Files.createTempDir() file = new File(dir, "BlockManagerWrite") } @@ -52,6 +54,10 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn dir.delete() } + override def beforeAll(): Unit = { + sparkContext = new SparkContext(conf) + } + override def afterAll(): Unit = { // Copied from LocalSparkContext which can't be imported since spark-core test-jar does not // get imported properly by sbt even if it is created. @@ -60,19 +66,19 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn } test("Data available in BM and HDFS") { - doTestHDFSBackedRDD(5, 5, 20, 5) + testHDFSBackedRDD(5, 5, 20, 5) } test("Data available in in BM but not in HDFS") { - doTestHDFSBackedRDD(5, 0, 20, 5) + testHDFSBackedRDD(5, 0, 20, 5) } test("Data available in in HDFS and not in BM") { - doTestHDFSBackedRDD(0, 5, 20, 5) + testHDFSBackedRDD(0, 5, 20, 5) } test("Data partially available in BM, and the rest in HDFS") { - doTestHDFSBackedRDD(3, 2, 20, 5) + testHDFSBackedRDD(3, 2, 20, 5) } /** @@ -82,7 +88,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn * @param blockCount Number of blocks to write (therefore, total # of events per block = * total/blockCount */ - private def doTestHDFSBackedRDD( + private def testHDFSBackedRDD( writeToBMCount: Int, writeToHDFSCount: Int, total: Int, @@ -152,8 +158,6 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn } private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = { - (0 until count).map { - _ => new WriteAheadLogFileSegment("random", 0l, 0) - } + (0 until count).map { _ => new WriteAheadLogFileSegment("random", 0l, 0) } } } From 9e47b5b74b4d568c00827e1bf2fbe15e24e2cba1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Oct 2014 14:42:13 -0700 Subject: [PATCH 05/11] Renamed class, simplified+added unit tests. --- .../streaming/rdd/HDFSBackedBlockRDD.scala | 92 ---------- .../rdd/WriteAheadLogBackedBlockRDD.scala | 116 +++++++++++++ .../spark/streaming/util/HdfsUtils.scala | 3 +- .../rdd/HDFSBackedBlockRDDSuite.scala | 163 ------------------ .../WriteAheadLogBackedBlockRDDSuite.scala | 144 ++++++++++++++++ 5 files changed, 261 insertions(+), 257 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala delete mode 100644 streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala deleted file mode 100644 index c893c800181a4..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rdd - -import scala.reflect.ClassTag - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.rdd.BlockRDD -import org.apache.spark.storage.{BlockId, StorageLevel} -import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} -import org.apache.spark._ - -private[streaming] -class HDFSBackedBlockRDDPartition( - val blockId: BlockId, - val index: Int, - val segment: WriteAheadLogFileSegment - ) extends Partition - -private[streaming] -class HDFSBackedBlockRDD[T: ClassTag]( - @transient sc: SparkContext, - @transient hadoopConfiguration: Configuration, - @transient override val blockIds: Array[BlockId], - @transient val segments: Array[WriteAheadLogFileSegment], - val storeInBlockManager: Boolean, - val storageLevel: StorageLevel - ) extends BlockRDD[T](sc, blockIds) { - - require(blockIds.length == segments.length, - "Number of block ids must be the same as number of segments!") - - // Hadoop Configuration is not serializable, so broadcast it as a serializable. - val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) - - override def getPartitions: Array[Partition] = { - assertValid() - (0 until blockIds.size).map { i => - new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) - }.toArray - } - - override def compute(split: Partition, context: TaskContext): Iterator[T] = { - assertValid() - val hadoopConf = broadcastedHadoopConf.value.value - val blockManager = SparkEnv.get.blockManager - val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] - val blockId = partition.blockId - blockManager.get(blockId) match { - // Data is in Block Manager, grab it from there. - case Some(block) => - block.data.asInstanceOf[Iterator[T]] - // Data not found in Block Manager, grab it from HDFS - case None => - logInfo("Reading partition data from write ahead log " + partition.segment.path) - val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) - val dataRead = reader.read(partition.segment) - reader.close() - // Currently, we support storing the data to BM only in serialized form and not in - // deserialized form - if (storeInBlockManager) { - blockManager.putBytes(blockId, dataRead, storageLevel) - } - dataRead.rewind() - blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] - } - } - - override def getPreferredLocations(split: Partition): Seq[String] = { - val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] - val locations = getBlockIdLocations() - locations.getOrElse(partition.blockId, - HdfsUtils.getBlockLocations(partition.segment.path, partition.segment.offset, - partition.segment.length, hadoopConfiguration) - .getOrElse(new Array[String](0)).toSeq) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala new file mode 100644 index 0000000000000..065603cb34552 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration +import org.apache.spark._ +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader} + +/** + * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. + * It contains information about the id of the blocks having this partition's data and + * the segment of the write ahead log that backs the partition. + * @param index index of the partition + * @param blockId id of the block having the partition data + * @param segment segment of the write ahead log having the partition data + */ +private[streaming] +class WriteAheadLogBackedBlockRDDPartition( + val index: Int, + val blockId: BlockId, + val segment: WriteAheadLogFileSegment + ) extends Partition + + +/** + * This class represents a special case of the BlockRDD where the data blocks in the block manager are also + * backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids + * in the block manager. If it does not find them, it looks up the corresponding file segment. + * + * @param sc SparkContext + * @param hadoopConfiguration Hadoop configuration + * @param blockIds Ids of the blocks that contains this RDD's data + * @param segments Segments in write ahead logs that contain this RDD's data + * @param storeInBlockManager Whether to store in the block manager after reading from the log segment + * @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true) + */ +private[streaming] +class WriteAheadLogBackedBlockRDD[T: ClassTag]( + @transient sc: SparkContext, + @transient hadoopConfiguration: Configuration, + @transient override val blockIds: Array[BlockId], + @transient val segments: Array[WriteAheadLogFileSegment], + val storeInBlockManager: Boolean, + val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + require(blockIds.length == segments.length, + s"Number of block ids (${blockIds.length}) must be the same as number of segments (${segments.length}})!") + + // Hadoop configuration is not serializable, so broadcast it as a serializable. + private val broadcastedHadoopConf = new SerializableWritable(hadoopConfiguration) + + override def getPartitions: Array[Partition] = { + assertValid() + Array.tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) } + } + + /** + * Gets the partition data by getting the corresponding block from the block manager. If the block does not + * exist, then the data is read from the corresponding segment in write ahead log files. + */ + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + assertValid() + val hadoopConf = broadcastedHadoopConf.value + val blockManager = SparkEnv.get.blockManager + val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] + val blockId = partition.blockId + blockManager.get(blockId) match { + case Some(block) => // Data is in Block Manager + val iterator = block.data.asInstanceOf[Iterator[T]] + logDebug(s"Read partition data of RDD $this from block manager, block $blockId") + iterator + case None => // Data not found in Block Manager, grab it from write ahead log file + val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) + val dataRead = reader.read(partition.segment) + reader.close() + logInfo(s"Read partition data of RDD $this from write ahead log, segment ${partition.segment}") + if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) + logDebug(s"Stored partition data of RDD $this into block manager with level $storageLevel") + dataRead.rewind() + } + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + } + } + + /** + * Get the preferred location of the partition. This returns the locations of the block if it is present in the + * block manager, else it returns the location of the corresponding segment in HDFS. + */ + override def getPreferredLocations(split: Partition): Seq[String] = { + val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] + val blockLocations = getBlockIdLocations().get(partition.blockId) + lazy val segmentLocations = HdfsUtils.getBlockLocations( + partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration) + blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index ece3c876d0716..9e278ec888595 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -52,8 +52,7 @@ private[streaming] object HdfsUtils { } } - def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): - Option[Array[String]] = { + def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala deleted file mode 100644 index 70897e4c0a8cf..0000000000000 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rdd - -import java.io.File -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.spark.{SparkConf, SparkContext} - -import scala.collection.mutable.ArrayBuffer -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} - -import com.google.common.io.Files -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} - -class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { - val conf = new SparkConf() - .setMaster("local[2]") - .setAppName(this.getClass.getSimpleName) - val hadoopConf = new Configuration() - // Since the same BM is reused in all tests, use an atomic int to generate ids - val idGenerator = new AtomicInteger(0) - - var sparkContext: SparkContext = null - var blockManager: BlockManager = null - var file: File = null - var dir: File = null - - before { - blockManager = sparkContext.env.blockManager - dir = Files.createTempDir() - file = new File(dir, "BlockManagerWrite") - } - - after { - file.delete() - dir.delete() - } - - override def beforeAll(): Unit = { - sparkContext = new SparkContext(conf) - } - - override def afterAll(): Unit = { - // Copied from LocalSparkContext which can't be imported since spark-core test-jar does not - // get imported properly by sbt even if it is created. - sparkContext.stop() - System.clearProperty("spark.driver.port") - } - - test("Data available in BM and HDFS") { - testHDFSBackedRDD(5, 5, 20, 5) - } - - test("Data available in in BM but not in HDFS") { - testHDFSBackedRDD(5, 0, 20, 5) - } - - test("Data available in in HDFS and not in BM") { - testHDFSBackedRDD(0, 5, 20, 5) - } - - test("Data partially available in BM, and the rest in HDFS") { - testHDFSBackedRDD(3, 2, 20, 5) - } - - /** - * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the - * BlockManager, so all reads need not happen from HDFS. - * @param total Total number of Strings to write - * @param blockCount Number of blocks to write (therefore, total # of events per block = - * total/blockCount - */ - private def testHDFSBackedRDD( - writeToBMCount: Int, - writeToHDFSCount: Int, - total: Int, - blockCount: Int - ) { - val countPerBlock = total / blockCount - val blockIds = (0 until blockCount).map { i => - StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) - } - - val writtenStrings = generateData(total, countPerBlock) - - if (writeToBMCount != 0) { - (0 until writeToBMCount).foreach { i => - blockManager - .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) - } - } - - val segments = { - if (writeToHDFSCount != 0) { - // Generate some fake segments for the blocks in BM so the RDD does not complain - generateFakeSegments(writeToBMCount) ++ - writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), - blockIds.slice(writeToBMCount, blockCount)) - } else { - generateFakeSegments(blockCount) - } - } - - val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, - segments.toArray, false, StorageLevel.MEMORY_ONLY) - - val dataFromRDD = rdd.collect() - // verify each partition is equal to the data pulled out - assert(writtenStrings.flatten === dataFromRDD) - } - - /** - * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that - * went into one block. - * @param count Number of Strings to write - * @param countPerBlock Number of Strings per block - * @return Seq of Seqs, each of these Seqs is one block - */ - private def generateData( - count: Int, - countPerBlock: Int - ): Seq[Seq[String]] = { - val strings = (0 until count).map { _ => scala.util.Random.nextString(50)} - strings.grouped(countPerBlock).toSeq - } - - private def writeDataToHDFS( - blockData: Seq[Seq[String]], - blockIds: Seq[BlockId] - ): Seq[WriteAheadLogFileSegment] = { - assert(blockData.size === blockIds.size) - val segments = new ArrayBuffer[WriteAheadLogFileSegment]() - val writer = new WriteAheadLogWriter(file.toString, hadoopConf) - blockData.zip(blockIds).foreach { - case (data, id) => - segments += writer.write(blockManager.dataSerialize(id, data.iterator)) - } - writer.close() - segments - } - - private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = { - (0 until count).map { _ => new WriteAheadLogFileSegment("random", 0l, 0) } - } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala new file mode 100644 index 0000000000000..c72a38cd7ae61 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File + +import scala.util.Random + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var dir: File = null + + override def beforeAll(): Unit = { + sparkContext = new SparkContext(conf) + blockManager = sparkContext.env.blockManager + dir = Files.createTempDir() + } + + override def afterAll(): Unit = { + // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. + sparkContext.stop() + dir.delete() + System.clearProperty("spark.driver.port") + } + + test("Read data available in block manager and write ahead log") { + testRDD(5, 5) + } + + test("Read data available only in block manager, not in write ahead log") { + testRDD(5, 0) + } + + test("Read data available only in write ahead log, not in block manager") { + testRDD(0, 5) + } + + test("Read data available only in write ahead log, and test storing in block manager") { + testRDD(0, 5, testStoreInBM = true) + } + + test("Read data with partially available in block manager, and rest in write ahead log") { + testRDD(3, 2) + } + + /** + * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to Block Manager and the rest + * to a WriteAheadLog, and then reading reading it all back using the RDD. + * It can also test if the partitions that were read from the log were again stored in block manager. + * @param numPartitionssInBM Number of partitions to write to the Block Manager + * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log + * @param testStoreInBM Test whether blocks read from log are stored back into block manager + */ + private def testRDD(numPartitionssInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) { + val numBlocks = numPartitionssInBM + numPartitionsInWAL + val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } } + + // Put the necessary blocks in the block manager + val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt())) + data.zip(blockIds).take(numPartitionssInBM).foreach { case(block, blockId) => + blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) + } + + // Generate write ahead log segments + val segments = generateFakeSegments(numPartitionssInBM) ++ + writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) + + // Make sure that the left `numPartitionssInBM` blocks are in block manager, and others are not + require( + blockIds.take(numPartitionssInBM).forall(blockManager.get(_).nonEmpty), + "Expected blocks not in BlockManager" + ) + require( + blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty), + "Unexpected blocks in BlockManager" + ) + + // Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not + require( + segments.takeRight(numPartitionsInWAL).forall(s => new File(s.path.stripPrefix("file://")).exists()), + "Expected blocks not in write ahead log" + ) + require( + segments.take(numPartitionssInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()), + "Unexpected blocks in write ahead log" + ) + + // Create the RDD and verify whether the returned data is correct + val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, + segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY) + assert(rdd.collect() === data.flatten) + + if (testStoreInBM) { + val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, + segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY) + assert(rdd2.collect() === data.flatten) + assert( + blockIds.forall(blockManager.get(_).nonEmpty), + "All blocks not found in block manager" + ) + } + } + + private def writeLogSegments(blockData: Seq[Seq[String]], blockIds: Seq[BlockId]): Seq[WriteAheadLogFileSegment] = { + require(blockData.size === blockIds.size) + val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf) + val segments = blockData.zip(blockIds).map { case (data, id) => + writer.write(blockManager.dataSerialize(id, data.iterator)) + } + writer.close() + segments + } + + private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = { + Array.fill(count)(new WriteAheadLogFileSegment("random", 0l, 0)) + } +} From 29aa0999ac72055bb51eeb6d8b51e4d21c8828ba Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Oct 2014 15:58:28 -0700 Subject: [PATCH 06/11] Fixed line length issues. --- .../rdd/WriteAheadLogBackedBlockRDD.scala | 45 +++++++++++-------- .../WriteAheadLogBackedBlockRDDSuite.scala | 24 +++++++--- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 065603cb34552..375e1dd0978a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -41,41 +41,47 @@ class WriteAheadLogBackedBlockRDDPartition( /** - * This class represents a special case of the BlockRDD where the data blocks in the block manager are also - * backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids - * in the block manager. If it does not find them, it looks up the corresponding file segment. + * This class represents a special case of the BlockRDD where the data blocks in + * the block manager are also backed by segments in write ahead logs. For reading + * the data, this RDD first looks up the blocks by their ids in the block manager. + * If it does not find them, it looks up the corresponding file segment. * * @param sc SparkContext - * @param hadoopConfiguration Hadoop configuration + * @param hadoopConfig Hadoop configuration * @param blockIds Ids of the blocks that contains this RDD's data * @param segments Segments in write ahead logs that contain this RDD's data - * @param storeInBlockManager Whether to store in the block manager after reading from the log segment - * @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true) + * @param storeInBlockManager Whether to store in the block manager after reading from the segment + * @param storageLevel storage level to store when storing in block manager + * (applicable when storeInBlockManager = true) */ private[streaming] class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, - @transient hadoopConfiguration: Configuration, + @transient hadoopConfig: Configuration, @transient override val blockIds: Array[BlockId], @transient val segments: Array[WriteAheadLogFileSegment], val storeInBlockManager: Boolean, val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { - require(blockIds.length == segments.length, - s"Number of block ids (${blockIds.length}) must be the same as number of segments (${segments.length}})!") + require( + blockIds.length == segments.length, + s"Number of block ids (${blockIds.length}) must be " + + s"the same as number of segments (${segments.length}})!") // Hadoop configuration is not serializable, so broadcast it as a serializable. - private val broadcastedHadoopConf = new SerializableWritable(hadoopConfiguration) + private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig) override def getPartitions: Array[Partition] = { assertValid() - Array.tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) } + Array.tabulate(blockIds.size) { i => + new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) } } /** - * Gets the partition data by getting the corresponding block from the block manager. If the block does not - * exist, then the data is read from the corresponding segment in write ahead log files. + * Gets the partition data by getting the corresponding block from the block manager. + * If the block does not exist, then the data is read from the corresponding segment + * in write ahead log files. */ override def compute(split: Partition, context: TaskContext): Iterator[T] = { assertValid() @@ -86,16 +92,16 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( blockManager.get(blockId) match { case Some(block) => // Data is in Block Manager val iterator = block.data.asInstanceOf[Iterator[T]] - logDebug(s"Read partition data of RDD $this from block manager, block $blockId") + logDebug(s"Read partition data of $this from block manager, block $blockId") iterator case None => // Data not found in Block Manager, grab it from write ahead log file val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) val dataRead = reader.read(partition.segment) reader.close() - logInfo(s"Read partition data of RDD $this from write ahead log, segment ${partition.segment}") + logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}") if (storeInBlockManager) { blockManager.putBytes(blockId, dataRead, storageLevel) - logDebug(s"Stored partition data of RDD $this into block manager with level $storageLevel") + logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] @@ -103,14 +109,15 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( } /** - * Get the preferred location of the partition. This returns the locations of the block if it is present in the - * block manager, else it returns the location of the corresponding segment in HDFS. + * Get the preferred location of the partition. This returns the locations of the block + * if it is present in the block manager, else it returns the location of the + * corresponding segment in HDFS. */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockLocations = getBlockIdLocations().get(partition.blockId) lazy val segmentLocations = HdfsUtils.getBlockLocations( - partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration) + partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c72a38cd7ae61..e7e880e052688 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -71,14 +71,19 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { } /** - * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to Block Manager and the rest - * to a WriteAheadLog, and then reading reading it all back using the RDD. - * It can also test if the partitions that were read from the log were again stored in block manager. + * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager + * and the rest to a write ahead log, and then reading reading it all back using the RDD. + * It can also test if the partitions that were read from the log were again stored in + * block manager. * @param numPartitionssInBM Number of partitions to write to the Block Manager * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log * @param testStoreInBM Test whether blocks read from log are stored back into block manager */ - private def testRDD(numPartitionssInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) { + private def testRDD( + numPartitionssInBM: Int, + numPartitionsInWAL: Int, + testStoreInBM: Boolean = false + ) { val numBlocks = numPartitionssInBM + numPartitionsInWAL val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } } @@ -104,11 +109,13 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { // Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not require( - segments.takeRight(numPartitionsInWAL).forall(s => new File(s.path.stripPrefix("file://")).exists()), + segments.takeRight(numPartitionsInWAL).forall(s => + new File(s.path.stripPrefix("file://")).exists()), "Expected blocks not in write ahead log" ) require( - segments.take(numPartitionssInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()), + segments.take(numPartitionssInBM).forall(s => + !new File(s.path.stripPrefix("file://")).exists()), "Unexpected blocks in write ahead log" ) @@ -128,7 +135,10 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { } } - private def writeLogSegments(blockData: Seq[Seq[String]], blockIds: Seq[BlockId]): Seq[WriteAheadLogFileSegment] = { + private def writeLogSegments( + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] + ): Seq[WriteAheadLogFileSegment] = { require(blockData.size === blockIds.size) val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => From 20aa7c67078e72061fe79dd2d50916c398e46c5f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Oct 2014 16:58:41 -0700 Subject: [PATCH 07/11] Fixed more line length issues. --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 2 +- .../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 375e1dd0978a8..5623e7f95ab51 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -116,7 +116,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockLocations = getBlockIdLocations().get(partition.blockId) - lazy val segmentLocations = HdfsUtils.getBlockLocations( + lazy val segmentLocations = HdfsUtils.getFileSegmentLocations( partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 9e278ec888595..59947891cb6c3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -52,7 +52,9 @@ private[streaming] object HdfsUtils { } } - def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = { + /** Get the locations of the HDFS blocks containing the given file segment. */ + def getFileSegmentLocations( + path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) From b0a18b1562ef4821fe3aa09555f26c14540e868b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Oct 2014 17:01:25 -0700 Subject: [PATCH 08/11] Fixed import order. --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 1 + .../spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 5623e7f95ab51..3e07e38036a01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.rdd import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration + import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index e7e880e052688..23aebb53811d9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -22,10 +22,11 @@ import scala.util.Random import com.google.common.io.Files import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfterAll, FunSuite} + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} -import org.scalatest.{BeforeAndAfterAll, FunSuite} class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { val conf = new SparkConf() From ed5fbf0765136da963f6a8447f1ff69191825392 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 29 Oct 2014 21:32:44 -0700 Subject: [PATCH 09/11] Minor updates. --- .../rdd/WriteAheadLogBackedBlockRDD.scala | 21 ++++++++++--------- .../WriteAheadLogBackedBlockRDDSuite.scala | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 3e07e38036a01..1c7b7787e19f4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -37,8 +37,8 @@ private[streaming] class WriteAheadLogBackedBlockRDDPartition( val index: Int, val blockId: BlockId, - val segment: WriteAheadLogFileSegment - ) extends Partition + val segment: WriteAheadLogFileSegment) + extends Partition /** @@ -59,11 +59,11 @@ private[streaming] class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient hadoopConfig: Configuration, - @transient override val blockIds: Array[BlockId], - @transient val segments: Array[WriteAheadLogFileSegment], - val storeInBlockManager: Boolean, - val storageLevel: StorageLevel - ) extends BlockRDD[T](sc, blockIds) { + @transient blockIds: Array[BlockId], + @transient segments: Array[WriteAheadLogFileSegment], + storeInBlockManager: Boolean, + storageLevel: StorageLevel) + extends BlockRDD[T](sc, blockIds) { require( blockIds.length == segments.length, @@ -76,7 +76,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(blockIds.size) { i => - new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) } + new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) + } } /** @@ -116,8 +117,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] - val blockLocations = getBlockIdLocations().get(partition.blockId) - lazy val segmentLocations = HdfsUtils.getFileSegmentLocations( + def blockLocations = getBlockIdLocations().get(partition.blockId) + def segmentLocations = HdfsUtils.getFileSegmentLocations( partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 23aebb53811d9..458c0ee7ca530 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -86,7 +86,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { testStoreInBM: Boolean = false ) { val numBlocks = numPartitionssInBM + numPartitionsInWAL - val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } } + val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50)) // Put the necessary blocks in the block manager val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt())) From 4a5866fb19fcd4fde7551294a30bd740500c834a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Oct 2014 03:42:37 -0700 Subject: [PATCH 10/11] Addressed one more comment. --- .../WriteAheadLogBackedBlockRDDSuite.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 458c0ee7ca530..10160244bcc91 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -76,31 +76,27 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { * and the rest to a write ahead log, and then reading reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. - * @param numPartitionssInBM Number of partitions to write to the Block Manager + * @param numPartitionsInBM Number of partitions to write to the Block Manager * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log * @param testStoreInBM Test whether blocks read from log are stored back into block manager */ - private def testRDD( - numPartitionssInBM: Int, - numPartitionsInWAL: Int, - testStoreInBM: Boolean = false - ) { - val numBlocks = numPartitionssInBM + numPartitionsInWAL + private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) { + val numBlocks = numPartitionsInBM + numPartitionsInWAL val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50)) // Put the necessary blocks in the block manager val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt())) - data.zip(blockIds).take(numPartitionssInBM).foreach { case(block, blockId) => + data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) => blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) } // Generate write ahead log segments - val segments = generateFakeSegments(numPartitionssInBM) ++ + val segments = generateFakeSegments(numPartitionsInBM) ++ writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) - // Make sure that the left `numPartitionssInBM` blocks are in block manager, and others are not + // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not require( - blockIds.take(numPartitionssInBM).forall(blockManager.get(_).nonEmpty), + blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), "Expected blocks not in BlockManager" ) require( @@ -115,7 +111,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { "Expected blocks not in write ahead log" ) require( - segments.take(numPartitionssInBM).forall(s => + segments.take(numPartitionsInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()), "Unexpected blocks in write ahead log" ) From 209e49cf7b7467f5ba10f9352745823104f6a332 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Oct 2014 13:42:57 -0700 Subject: [PATCH 11/11] Better fix to style issue. --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 4 ++-- .../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 1c7b7787e19f4..23295bf658712 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -117,9 +117,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] - def blockLocations = getBlockIdLocations().get(partition.blockId) + val blockLocations = getBlockIdLocations().get(partition.blockId) def segmentLocations = HdfsUtils.getFileSegmentLocations( partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) - blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) + blockLocations.getOrElse(segmentLocations) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 59947891cb6c3..27a28bab83ed5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -54,12 +54,12 @@ private[streaming] object HdfsUtils { /** Get the locations of the HDFS blocks containing the given file segment. */ def getFileSegmentLocations( - path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = { + path: String, offset: Long, length: Long, conf: Configuration): Array[String] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length)) - blockLocs.map(_.flatMap(_.getHosts)) + blockLocs.map(_.flatMap(_.getHosts)).getOrElse(Array.empty) } def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {