Skip to content

Commit

Permalink
spliting append func into two parts:grow/append;doubling the size whe…
Browse files Browse the repository at this point in the history
…n growing;sys.error instead of UnsupportedOperationException
  • Loading branch information
cxzl25 committed May 14, 2018
1 parent d9d8e62 commit 22a2767
Showing 1 changed file with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.types.LongType
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{KnownSizeEstimation, Utils}

Expand Down Expand Up @@ -362,6 +363,8 @@ private[joins] object UnsafeHashedRelation {
private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, capacity: Int)
extends MemoryConsumer(mm) with Externalizable with KryoSerializable {

private val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH

// Whether the keys are stored in dense mode or not.
private var isDense = false

Expand Down Expand Up @@ -557,7 +560,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
def append(key: Long, row: UnsafeRow): Unit = {
val sizeInBytes = row.getSizeInBytes
if (sizeInBytes >= (1 << SIZE_BITS)) {
sys.error("Does not support row that is larger than 256M")
throw new UnsupportedOperationException("Does not support row that is larger than 256M")
}

if (key < minKey) {
Expand All @@ -567,22 +570,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = key
}

// There is 8 bytes for the pointer to next value
val needSize = cursor + 8 + row.getSizeInBytes
val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET
if (needSize > nowSize) {
val used = page.length
if (used >= (1 << 30)) {
sys.error("Can not build a HashedRelation that is larger than 8G")
}
val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2)
ensureAcquireMemory(used * 8L * multiples)
val newPage = new Array[Long](used * multiples)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
page = newPage
freeMemory(used * 8L)
}
grow(row.getSizeInBytes)

// copy the bytes of UnsafeRow
val offset = cursor
Expand Down Expand Up @@ -618,7 +606,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
growArray()
} else if (numKeys > array.length / 2 * 0.75) {
// The fill ratio should be less than 0.75
sys.error("Cannot build HashedRelation with more than 1/3 billions unique keys")
throw new UnsupportedOperationException(
"Cannot build HashedRelation with more than 1/3 billions unique keys")
}
}
} else {
Expand All @@ -629,6 +618,32 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
}

private def grow(neededSize: Int): Unit = {
// There is 8 bytes for the pointer to next value
val totalNeededSize = cursor + 8 + neededSize
val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET
if (totalNeededSize > nowSize) {
val used = page.length
if (used >= (1 << 30)) {
throw new UnsupportedOperationException(
"Can not build a HashedRelation that is larger than 8G")
}
val multiples = math.floor(totalNeededSize.toDouble / nowSize).toInt * 2
val newLength = used * multiples
if (newLength > ARRAY_MAX) {
throw new UnsupportedOperationException(
"Cannot grow internal buffer by size " + newLength +
" because the size after growing " + "exceeds size limitation " + ARRAY_MAX)
}
ensureAcquireMemory(newLength * 8L)
val newPage = new Array[Long](newLength)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
page = newPage
freeMemory(used * 8L)
}
}

private def growArray(): Unit = {
var old_array = array
val n = array.length
Expand Down

0 comments on commit 22a2767

Please sign in to comment.