Skip to content

Commit

Permalink
address patrick's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyuan committed Mar 28, 2014
1 parent 589eafe commit 91fa09d
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.storage

import java.io.IOException
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -109,13 +110,28 @@ private class TachyonStore(

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = tachyonManager.getFile(blockId)
if (file == null || file.getLocationHosts().size == 0) {
return None
}
val is = file.getInStream(ReadType.CACHE)
var buffer: ByteBuffer = null
if (is != null){
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
is.read(bs, 0, size.asInstanceOf[Int])
buffer = ByteBuffer.wrap(bs)
try {
if (is != null) {
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
buffer = ByteBuffer.wrap(bs)
if (fetchSize != size) {
logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size +
" is not equal to fetched size " + fetchSize)
return None
}
}
} catch {
case ioe: IOException => {
logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe)
return None
}
}
Some(buffer)
}
Expand Down

0 comments on commit 91fa09d

Please sign in to comment.