forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of https://github.com/apache/spark
- Loading branch information
Showing
53 changed files
with
729 additions
and
507 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle | ||
|
||
import java.io._ | ||
import java.nio.ByteBuffer | ||
|
||
import org.apache.spark.SparkEnv | ||
import org.apache.spark.storage._ | ||
|
||
/** | ||
* Create and maintain the shuffle blocks' mapping between logic block and physical file location. | ||
* Data of shuffle blocks from the same map task are stored in a single consolidated data file. | ||
* The offsets of the data blocks in the data file are stored in a separate index file. | ||
* | ||
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" | ||
* as the filename postfix for data file, and ".index" as the filename postfix for index file. | ||
* | ||
*/ | ||
private[spark] | ||
class IndexShuffleBlockManager extends ShuffleBlockManager { | ||
|
||
private lazy val blockManager = SparkEnv.get.blockManager | ||
|
||
/** | ||
* Mapping to a single shuffleBlockId with reduce ID 0. | ||
* */ | ||
def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = { | ||
ShuffleBlockId(shuffleId, mapId, 0) | ||
} | ||
|
||
def getDataFile(shuffleId: Int, mapId: Int): File = { | ||
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) | ||
} | ||
|
||
private def getIndexFile(shuffleId: Int, mapId: Int): File = { | ||
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) | ||
} | ||
|
||
/** | ||
* Remove data file and index file that contain the output data from one map. | ||
* */ | ||
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { | ||
var file = getDataFile(shuffleId, mapId) | ||
if (file.exists()) { | ||
file.delete() | ||
} | ||
|
||
file = getIndexFile(shuffleId, mapId) | ||
if (file.exists()) { | ||
file.delete() | ||
} | ||
} | ||
|
||
/** | ||
* Write an index file with the offsets of each block, plus a final offset at the end for the | ||
* end of the output file. This will be used by getBlockLocation to figure out where each block | ||
* begins and ends. | ||
* */ | ||
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = { | ||
val indexFile = getIndexFile(shuffleId, mapId) | ||
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) | ||
try { | ||
// We take in lengths of each block, need to convert it to offsets. | ||
var offset = 0L | ||
out.writeLong(offset) | ||
|
||
for (length <- lengths) { | ||
offset += length | ||
out.writeLong(offset) | ||
} | ||
} finally { | ||
out.close() | ||
} | ||
} | ||
|
||
/** | ||
* Get the location of a block in a map output file. Uses the index file we create for it. | ||
* */ | ||
private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = { | ||
// The block is actually going to be a range of a single map output file for this map, so | ||
// find out the consolidated file, then the offset within that from our index | ||
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) | ||
|
||
val in = new DataInputStream(new FileInputStream(indexFile)) | ||
try { | ||
in.skip(blockId.reduceId * 8) | ||
val offset = in.readLong() | ||
val nextOffset = in.readLong() | ||
new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) | ||
} finally { | ||
in.close() | ||
} | ||
} | ||
|
||
override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = { | ||
val segment = getBlockLocation(blockId) | ||
blockManager.diskStore.getBytes(segment) | ||
} | ||
|
||
override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = { | ||
Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])) | ||
} | ||
|
||
override def stop() = {} | ||
} |
38 changes: 38 additions & 0 deletions
38
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle | ||
|
||
import java.nio.ByteBuffer | ||
|
||
import org.apache.spark.storage.{FileSegment, ShuffleBlockId} | ||
|
||
private[spark] | ||
trait ShuffleBlockManager { | ||
type ShuffleId = Int | ||
|
||
/** | ||
* Get shuffle block data managed by the local ShuffleBlockManager. | ||
* @return Some(ByteBuffer) if block found, otherwise None. | ||
*/ | ||
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] | ||
|
||
def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] | ||
|
||
def stop(): Unit | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.