Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block #19222

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f427ca2
introduce ByteArrayMemoryBlock, IntArrayMemoryBlock, LongArrayMemoryB…
kiszk Sep 13, 2017
5d7ccdb
OffHeapColumnVector uses UnsafeMemoryAllocator
kiszk Sep 13, 2017
251fa09
UTF8String uses UnsafeMemoryAllocator
kiszk Sep 13, 2017
790bbe7
Platform.copymemory() in UsafeInMemorySorter uses new MemoryBlock
kiszk Sep 13, 2017
93a792e
address review comments
kiszk Sep 14, 2017
0beab03
fix test failures (e.g. String in UnsafeArrayData)
kiszk Sep 14, 2017
fcf764c
fix failures
kiszk Sep 18, 2017
d2d2e50
minor update of UTF8String constructor
kiszk Sep 21, 2017
f5e10bb
rename method name
kiszk Sep 22, 2017
1905e8c
remove unused code
kiszk Sep 22, 2017
7778e58
update arrayEquals
kiszk Sep 22, 2017
4f96c82
rebase master
kiszk Sep 22, 2017
d1d6ae9
make more methods final
kiszk Sep 22, 2017
914dcd1
make fill method final in MemoryBlock
kiszk Sep 22, 2017
336e4b7
fix test failures
kiszk Sep 23, 2017
5be9ccb
add testsuite
kiszk Sep 24, 2017
43e6b57
pass concrete type to the first argument of Platform.get*/put* to get…
kiszk Sep 24, 2017
05f024e
rename methods related to hash
kiszk Sep 28, 2017
9071cf6
added methods for MemoryBlock
kiszk Sep 28, 2017
37ee9fa
rebase with master
kiszk Sep 28, 2017
d0b5d59
fix scala style error
kiszk Sep 28, 2017
5cdad44
use MemoryBlock in Murmur3 for performance reason
kiszk Oct 14, 2017
91028fa
fix typo in comment
kiszk Oct 14, 2017
0210bd1
address review comment
kiszk Oct 29, 2017
df6dad3
rebase with master
kiszk Nov 28, 2017
1fa47a8
fix failures
kiszk Feb 20, 2018
01f9c8e
fix failures in ArrowColumnVectorSuite and FeatureHasherSuite
kiszk Feb 20, 2018
2ed8f82
address review comments
kiszk Feb 21, 2018
5e3afd1
address review comments
kiszk Feb 23, 2018
95fbdee
fix test failures
kiszk Feb 24, 2018
9b4975b
fix test failures
kiszk Feb 25, 2018
9e2697c
remove MemoryBlock offset at caller site for allocate() and copyMemory()
kiszk Feb 25, 2018
8cd4853
provide generic copyFrom() and writeTo() in MemoryBlock
kiszk Feb 25, 2018
1bed048
address review comments
kiszk Feb 26, 2018
c79585f
address review comments
kiszk Feb 27, 2018
77cdb81
fix indent
kiszk Feb 27, 2018
ee5a798
remove assert for performance
kiszk Feb 27, 2018
c9f401a
address review comments
kiszk Mar 2, 2018
cf2d532
fix compilation failure
kiszk Mar 2, 2018
eb0cc6d
address review comment
kiszk Mar 2, 2018
6f57994
fix test failures
kiszk Mar 2, 2018
3a93d61
fix test failures
kiszk Mar 3, 2018
abf6ba0
reduce duplicated code in hash
kiszk Mar 3, 2018
4567781
address review comments
kiszk Mar 3, 2018
95ffd0f
address review comments
kiszk Mar 3, 2018
9cbb876
add review comment
kiszk Mar 4, 2018
291203c
add review comments
kiszk Mar 4, 2018
a62770b
address review comments
kiszk Mar 6, 2018
f9bc4d6
address review comments
kiszk Mar 7, 2018
c6d45ea
address review comments
kiszk Mar 16, 2018
5284593
address review comment
kiszk Mar 17, 2018
b1750a1
address review comments
kiszk Mar 20, 2018
38ddf48
address review comment
kiszk Mar 20, 2018
4e46a18
make MemoryBlock.length mutable for reuse
kiszk Mar 22, 2018
511d58d
address review comment
kiszk Mar 26, 2018
1939fda
include MemoryBlock.offset in get/putXXX
kiszk Mar 26, 2018
cb4b30b
Merge branch 'SPARK-10399' of github.com:kiszk/spark into SPARK-10399
kiszk Mar 26, 2018
c53b6b8
address review comments
kiszk Mar 27, 2018
45aa736
fix test failure
kiszk Mar 27, 2018
8690e43
address review comment
kiszk Mar 29, 2018
59fd393
fix test failures
kiszk Mar 29, 2018
b69cb64
address yesterday's review comment
kiszk Mar 29, 2018
94c9648
address review comment
kiszk Mar 29, 2018
e4a7016
address review comments
kiszk Apr 5, 2018
3d03f92
Address review comments
kiszk Apr 5, 2018
50326ca
Address review comments
kiszk Apr 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* Simulates Hive's hashing function from Hive v1.2.1
Expand All @@ -38,12 +39,17 @@ public static int hashLong(long input) {
return (int) ((input >>> 32) ^ input);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
public static int hashUnsafeBytesBlock(MemoryBlock mb) {
long lengthInBytes = mb.size();
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
for (int i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) Platform.getByte(base, offset + i);
for (long i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) mb.getByte(i);
}
return result;
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static void setMemory(long address, byte value, long size) {
}

public static void copyMemory(
Object src, long srcOffset, Object dst, long dstOffset, long length) {
Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

public class ByteArrayMethods {

Expand Down Expand Up @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;

private static final boolean unaligned = Platform.unaligned();
/**
* MemoryBlock equality check for MemoryBlocks.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEqualsBlock(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no one is calling it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called from UTF8String class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this only works for ByteArrayMemoryBlock, doesn't it?

MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
}

/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
Expand All @@ -56,7 +67,7 @@ public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;

// check if stars align and we can get both offsets to be aligned
// check if starts align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.unsafe.array;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
Expand All @@ -33,16 +32,12 @@ public final class LongArray {
private static final long WIDTH = 8;

private final MemoryBlock memory;
private final Object baseObj;
private final long baseOffset;

private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}

Expand All @@ -51,11 +46,11 @@ public MemoryBlock memoryBlock() {
}

public Object getBaseObject() {
return baseObj;
return memory.getBaseObject();
}

public long getBaseOffset() {
return baseOffset;
return memory.getBaseOffset();
}

/**
Expand All @@ -69,8 +64,8 @@ public long size() {
* Fill this all with 0L.
*/
public void zeroOut() {
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
Platform.putLong(baseObj, off, 0);
for (long off = 0; off < length * WIDTH; off += WIDTH) {
memory.putLong(off, 0);
}
}

Expand All @@ -80,7 +75,7 @@ public void zeroOut() {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update it to use 0-based offset.

memory.putLong(index * WIDTH, value);
}

/**
Expand All @@ -89,6 +84,6 @@ public void set(int index, long value) {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
return memory.getLong(index * WIDTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.unsafe.hash;

import org.apache.spark.unsafe.Platform;
import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
Expand Down Expand Up @@ -49,49 +51,66 @@ public static int hashInt(int input, int seed) {
}

public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeWords(base, offset, lengthInBytes, seed);
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
int h1 = hashBytesByIntBlock(base, seed);
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = Platform.getByte(base, offset + i);
int halfWord = base.getByte(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}

public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
k1 ^= (base.getByte(i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}

private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
long lengthInBytes = base.size();
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = Platform.getInt(base, offset + i);
for (long i = 0; i < lengthInBytes; i += 4) {
int halfWord = base.getInt(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;

import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.Platform;

/**
* A consecutive block of memory with a byte array on Java heap.
*/
public final class ByteArrayMemoryBlock extends MemoryBlock {

private final byte[] array;

public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
super(obj, offset, size);
this.array = obj;
assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we require offset >= Platform.BYTE_ARRAY_OFFSET?

Copy link
Member Author

@kiszk kiszk Mar 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I agree with you.

However, to add this assertion causes a failure (assertion error at here) at UTF8StringSuite.writeToOutputStreamUnderflow().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, from here and the accessors below, looks like the given offset includes Platform.BYTE_ARRAY_OFFSET? What will happen if a given offset is less than that?

"The sum of size " + size + " and offset " + offset + " should not be larger than " +
"the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
}

public ByteArrayMemoryBlock(long length) {
this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
}

@Override
public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
if (offset == 0 && size == this.size()) return this;
return new ByteArrayMemoryBlock(array, this.offset + offset, size);
}

public byte[] getByteArray() { return array; }

/**
* Creates a memory block pointing to the memory used by the byte array.
*/
public static ByteArrayMemoryBlock fromArray(final byte[] array) {
return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
}

@Override
public final int getInt(long offset) {
return Platform.getInt(array, this.offset + offset);
}

@Override
public final void putInt(long offset, int value) {
Platform.putInt(array, this.offset + offset, value);
}

@Override
public final boolean getBoolean(long offset) {
return Platform.getBoolean(array, this.offset + offset);
}

@Override
public final void putBoolean(long offset, boolean value) {
Platform.putBoolean(array, this.offset + offset, value);
}

@Override
public final byte getByte(long offset) {
return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't use Platform.getByte?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion from @cloud-fan. The Java array access is faster than Platform.getByte.

}

@Override
public final void putByte(long offset, byte value) {
array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

@Override
public final short getShort(long offset) {
return Platform.getShort(array, this.offset + offset);
}

@Override
public final void putShort(long offset, short value) {
Platform.putShort(array, this.offset + offset, value);
}

@Override
public final long getLong(long offset) {
return Platform.getLong(array, this.offset + offset);
}

@Override
public final void putLong(long offset, long value) {
Platform.putLong(array, this.offset + offset, value);
}

@Override
public final float getFloat(long offset) {
return Platform.getFloat(array, this.offset + offset);
}

@Override
public final void putFloat(long offset, float value) {
Platform.putFloat(array, this.offset + offset, value);
}

@Override
public final double getDouble(long offset) {
return Platform.getDouble(array, this.offset + offset);
}

@Override
public final void putDouble(long offset, double value) {
Platform.putDouble(array, this.offset + offset, value);
}
}
Loading