Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block #19222

Closed
wants to merge 66 commits into from

Conversation

kiszk
Copy link
Member

@kiszk kiszk commented Sep 13, 2017

What changes were proposed in this pull request?

This PR allows us to use one of several types of MemoryBlock, such as byte array, int array, long array, or java.nio.DirectByteBuffer. To use java.nio.DirectByteBuffer allows to have off heap memory which is automatically deallocated by JVM. MemoryBlock class has primitive accessors like Platform.getInt(), Platform.putint(), or Platform.copyMemory().

This PR uses MemoryBlock for OffHeapColumnVector, UTF8String, and other places. This PR can improve performance of operations involving memory accesses (e.g. UTF8String.trim) by 1.8x.

For now, this PR does not use MemoryBlock for BufferHolder based on @cloud-fan's suggestion.

Since this PR is a successor of #11494, close #11494. Many codes were ported from #11494. Many efforts were put here. I think this PR should credit to @yzotov.

This PR can achieve 1.1-1.4x performance improvements for operations in UTF8String or Murmur3_x86_32. Other operations are almost comparable performances.

Without this PR

OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 526 /  536          0.0   131399881.5       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       525 /  552       1022.6           1.0       1.0X
substring                                      414 /  423       1298.0           0.8       1.3X

With this PR

OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 474 /  488          0.0   118552232.0       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       476 /  480       1127.3           0.9       1.0X
substring                                      287 /  291       1869.9           0.5       1.7X

Benchmark program

test("benchmark Murmur3_x86_32") {
  val length = 8192 * 32768 + 31
  val seed = 42L
  val iters = 1 << 2
  val random = new Random(seed)
  val arrays = Array.fill[MemoryBlock](numArrays) {
    val bytes = new Array[Byte](length)
    random.nextBytes(bytes)
    new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
  }
                                                                
  val benchmark = new Benchmark("Hash byte arrays with length " + length,
    iters * numArrays, minNumIters = 20) 
  benchmark.addCase("HiveHasher") { _: Int =>
    var sum = 0L
    for (_ <- 0L until iters) {
      sum += HiveHasher.hashUnsafeBytesBlock(
        arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
    }
  }
  benchmark.run()   
}

test("benchmark UTF8String") {
  val N = 512 * 1024 * 1024
  val iters = 2
  val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
  val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
  val s0 = UTF8String.fromString(str0)
  benchmark.addCase("hashCode") { _: Int =>
    var h: Int = 0
    for (_ <- 0L until iters) { h += s0.hashCode }
  }
  benchmark.addCase("substring") { _: Int =>
    var s: UTF8String = null
    for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
  }
  benchmark.run()
}

I run this benchmark program using the commit. I got the following results:

OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Memory access benchmarks:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt()              220 /  221        609.3           1.6       1.0X
Platform get/putInt(byte[])                    220 /  236        610.9           1.6       1.0X
Platform get/putInt(Object)                    492 /  494        272.8           3.7       0.4X
OnHeapMemoryBlock get/putLong()                322 /  323        416.5           2.4       0.7X
long[]                                         221 /  221        608.0           1.6       1.0X
Platform get/putLong(long[])                   321 /  321        418.7           2.4       0.7X
Platform get/putLong(Object)                   561 /  563        239.2           4.2       0.4X

I also run this benchmark program for comparing performance of Platform.copyMemory().

OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Platform copyMemory:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Object to Object                              1961 / 1967          8.6         116.9       1.0X
System.arraycopy Object to Object             1917 / 1921          8.8         114.3       1.0X
byte array to byte array                      1961 / 1968          8.6         116.9       1.0X
System.arraycopy byte array to byte array      1909 / 1937          8.8         113.8       1.0X
int array to int array                        1921 / 1990          8.7         114.5       1.0X
double array to double array                  1918 / 1923          8.7         114.3       1.0X
Object to byte array                          1961 / 1967          8.6         116.9       1.0X
Object to short array                         1965 / 1972          8.5         117.1       1.0X
Object to int array                           1910 / 1915          8.8         113.9       1.0X
Object to float array                         1971 / 1978          8.5         117.5       1.0X
Object to double array                        1919 / 1944          8.7         114.4       1.0X
byte array to Object                          1959 / 1967          8.6         116.8       1.0X
int array to Object                           1961 / 1970          8.6         116.9       1.0X
double array to Object                        1917 / 1924          8.8         114.3       1.0X

These results show three facts:

  1. According to the second/third or sixth/seventh results in the first experiment, if we use Platform.get/putInt(Object), we achieve more than 2x worse performance than Platform.get/putInt(byte[]) with concrete type (i.e. byte[]).
  2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is array[]. Cons of array[] is that it is not possible to support unaligned-8byte access.
  3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, getInt()/putInt() or getLong()/putLong() in subclasses of MemoryBlock can achieve comparable performance to Platform.get/putInt() or Platform.get/putLong() with concrete type (second or sixth result). There is no overhead regarding virtual call.
  4. According to results in the second experiment, for Platform.copy(), to pass Object can achieve the same performance as to pass any type of primitive array as source or destination.
  5. According to second/fourth results in the second experiment, Platform.copy() can achieve the same performance as System.arrayCopy. It would be good to use Platform.copy() since Platform.copy() can take any types for src and dst.

We are incrementally replace Platform.get/putXXX with MemoryBlock.get/putXXX. This is because we have two advantages.

  1. Achieve better performance due to having a concrete type for an array.
  2. Use simple OO design instead of passing Object
    It is easy to use MemoryBlock in InternalRow, BufferHolder, TaskMemoryManager, and others that are already abstracted. It is not easy to use MemoryBlock in utility classes related to hashing or others.

Other candidates are

  • UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
  • UTF8StringBuffer
  • BufferHolder
  • TaskMemoryManager
  • OnHeapColumnVector
  • BytesToBytesMap
  • CachedBatch
  • classes for hash
  • others.

How was this patch tested?

Added UnsafeMemoryAllocator

@SparkQA
Copy link

SparkQA commented Sep 13, 2017

Test build #81731 has finished for PR 19222 at commit b7ffa10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -75,67 +76,131 @@ public static boolean unaligned() {
return unaligned;
}

public static int getInt(MemoryBlock object, long offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a part of the memory block api? Instead of platform? Same for all other methods here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to move them (i.e. methods with MemoryBlock argument) into unsafe/memory/MemoryBlock.java?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
int i = 0;

// check if stars align and we can get both offsets to be aligned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stars align...? :)... starts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol :) If you scroll below, this is copy-paste from the L89 below .. so not introduced by this PR. I feel this might not be a typo and the author legitimately meant that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⭐️ ⭐️ ⭐️

/**
* A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
*/
public class ByteArrayMemoryBlock extends MemoryLocation implements MemoryBlock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need different implementations of a MemoryBlock? We should be able to have all required functionality with one class, and a little bit of offset magic.

I am opposed to creating a class hierarchy because this is super performance critical code. Polymorphism will slow us down.

Copy link
Member Author

@kiszk kiszk Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I agree with your concern about performance due to creating a class hierarchy.
This implementation uses class hierarchy for two cases

  1. Statically select target method using signature (e.g. Platform.getInt())
  2. Dynamically select target method using receiver (e.g. MemoryBlock.getBaseOffset())

Case 1 does not affect performance since it is resolved at javac. Case 2 is your concern. Since case 2. is used mainly for getBaseObject() and getBaseOffset() at performance critical path. These two methods must be final method (i.e. we expect method inlining by JIT compiler).

  1. MemoryLocation is dropped
  2. Several implementation classes extends MemoryBlock after making MemoryBlock abstract` as follows:
public abstract class MemoryBlock {
  Object obj;
  long offset;

  public MemoryBlock(Object obj, long offset) {
    this.obj = obj;
    this.offset = offset;
  }

  public final Object getBaseObject() { return obj; }
  public final long getBaseOffset() { return offset; }

  abstract long size();
  abstract void setPageNumber(int pageNum);
  abstract int getPageNumber();
  abstract void fill(byte value);
  abstract MemoryBlock allocate(long offset, long size);
}

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

static {
try {
Class cb = UnsafeMemoryAllocator.class.getClassLoader().loadClass("java.nio.DirectByteBuffer");
bufAddrMethod = cb.getMethod("address");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use sun.nio.ch.DirectBuffer here. Both are evil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I used this class.


/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
* TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but its private at L34

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated comments

@@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unwanted import ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -46,6 +47,42 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEquals(
MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any semantic difference between this method and one right below ? Can we do getBaseObject() over the leftBase and rightBase and call the method below at L#85 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you missed this. I still see the code duplication. This entire method block could be replaced as:

arrayEquals(
  leftBase.getBaseObject(),
  leftOffset,
  rightBase.getBaseObject(),
  rightOffset,
  length
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I missed this.

@@ -59,6 +60,18 @@ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, i
return fmix(h1, lengthInBytes);
}

public static int hashUnsafeBytes(MemoryBlock base, long offset, int lengthInBytes, int seed) {
Copy link
Contributor

@tejasapatil tejasapatil Sep 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. I feel that we should not have 2 flavours of the same method. In case of bug fixes / changes, it might happen that people forget to change in both the places.

I know that doing it in this PR might be lot of changes and it will take time to do that in the entire codebase. Meantime, we could avoid having 2 versions of the same method.

Copy link
Member Author

@kiszk kiszk Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. Is it better to add postfix MB of a method name to another version of the method or add postfix Object of a method name to the original version of the method?

@HyukjinKwon
Copy link
Member

BTW, mind adding Closes #11494 in the PR description so that that one can be closed automatically when this one is merged?

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81793 has finished for PR 19222 at commit 9711251.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81889 has finished for PR 19222 at commit 7c2c0cb.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -38,6 +39,10 @@ public static int hashLong(long input) {
return (int) ((input >>> 32) ^ input);
}

public static int hashUnsafeBytesMB(MemoryBlock base, long offset, int lengthInBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BytesMB seemed weird to read as MB is generally interpreted as mega bytes. How about hashUnsafeBlock ?

@SparkQA
Copy link

SparkQA commented Sep 21, 2017

Test build #82044 has finished for PR 19222 at commit 4606ecf.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

(I have been following this out of my curiosity but I think this should be credit to @kiszk ...)

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82083 has started for PR 19222 at commit 51d9935.

@hvanhovell
Copy link
Contributor

Circling back to the inheritance discussion. My worry is that this will introduce a lot polymorphic call sites in very performance critical code. Even if you tag on final to each method, the call site will still be monomorphic. There are two options from my end:

  1. Convince me wrong, by posting benchmark before and after benchmark results.
  2. Use a single class that combines the various implementations. This is very easy to do and will have relatively little overhead.

Also cc @rednaxelafx

In all I really think this work is worth the effort. It should make working with Spark internals a lot easier/safer.

@kiszk
Copy link
Member Author

kiszk commented Sep 22, 2017

As @hvanhovell pointed out, the first implementation introduced a lot polymorphic call sites in very performance critical code (e.g. getBaseObject() or getBaseOffset(). in the current implementation, while MemoryBlock class has subclasses, getBaseObject() and getBaseOffset() have a single implementation in MemoryBlock without polymorphic call site. Thus, I will take option 1 by using a micro benchmark program :)

Any comments are appreciated from @rednaxelafx

@hvanhovell
Copy link
Contributor

hvanhovell commented Sep 22, 2017

Ok, I missed that you had moved these into the base class. I still look forward to the benchmark :)...

I still think that the hierarchy does not give any benefit. All block subclasses implement size, fill, setPageNum & getPageNum in the same way, and the other methods are either unused or can be generalized without a problem. That leaves a few static constructors which can also be moved.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82087 has started for PR 19222 at commit 24ad970.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82078 has finished for PR 19222 at commit e4779d0.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82089 has started for PR 19222 at commit 66bfbfc.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82080 has finished for PR 19222 at commit 61d506e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Sep 23, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 23, 2017

Test build #82100 has finished for PR 19222 at commit 66bfbfc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2017

Test build #82116 has finished for PR 19222 at commit 8ec08ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Sep 23, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 23, 2017

Test build #82118 has finished for PR 19222 at commit 8ec08ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2017

Test build #82132 has finished for PR 19222 at commit 7ec26f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class MemoryBlockSuite

@kiszk kiszk changed the title [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block Apr 6, 2018
@kiszk
Copy link
Member Author

kiszk commented Apr 6, 2018

@cloud-fan yeah, good point. I created a new JIRA entry which has sub-tasks for the following works.

ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 6, 2018
… not expected to be supported

## What changes were proposed in this pull request?

This PR excludes an existing UT [`writeToOutputStreamUnderflow()`](https://github.com/apache/spark/blob/master/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java#L519-L532) in `UTF8StringSuite`.

As discussed [here](apache#19222 (comment)), the behavior of this test looks surprising. This test seems to access metadata area of the JVM object where is reserved by `Platform.BYTE_ARRAY_OFFSET`.

This test is introduced thru apache#16089 by NathanHowell. More specifically, [the commit](apache@27c102d) `Improve test coverage of UTFString.write` introduced this UT. However, I cannot find any discussion about this UT.

I think that it would be good to exclude this UT.

```java
  public void writeToOutputStreamUnderflow() throws IOException {
    // offset underflow is apparently supported?
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);

    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
      new UTF8String(
        new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
          .writeTo(outputStream);
      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
      outputStream.reset();
    }
  }
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#20995 from kiszk/SPARK-23882.
asfgit pushed a commit that referenced this pull request Apr 6, 2018
## What changes were proposed in this pull request?

This PR fixes the following errors in [Java lint](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/7717/console) after #19222 has been merged. These errors were pointed by ueshin .

```
[ERROR] src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java:[57] (sizes) LineLength: Line is longer than 100 characters (found 106).
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[26,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java:[23,10] (modifier) ModifierOrder: 'public' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[64,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[69,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[74,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[79,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[84,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[89,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[94,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[99,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[104,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[109,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[114,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[119,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[124,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[129,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[60,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[65,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[70,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[75,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[80,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[85,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[90,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[95,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[100,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[105,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[110,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[115,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[120,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[125,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java:[114,16] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.memory.MemoryBlock.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[126,15] (naming) MethodName: Method name 'ByteArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[143,15] (naming) MethodName: Method name 'OnHeapMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[160,15] (naming) MethodName: Method name 'OffHeapArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[19,8] (imports) UnusedImports: Unused import - com.google.common.primitives.Ints.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[21,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes #20991 from kiszk/SPARK-10399-jlint.
robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 7, 2018
…veral types of memory block

## What changes were proposed in this pull request?

This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock`  class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.

This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.

For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](apache#11494 (comment)).

Since this PR is a successor of apache#11494, close apache#11494. Many codes were ported from apache#11494. Many efforts were put here. **I think this PR should credit to yzotov.**

This PR can achieve **1.1-1.4x performance improvements** for  operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 526 /  536          0.0   131399881.5       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       525 /  552       1022.6           1.0       1.0X
substring                                      414 /  423       1298.0           0.8       1.3X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 474 /  488          0.0   118552232.0       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       476 /  480       1127.3           0.9       1.0X
substring                                      287 /  291       1869.9           0.5       1.7X
```

Benchmark program
```
test("benchmark Murmur3_x86_32") {
  val length = 8192 * 32768 + 31
  val seed = 42L
  val iters = 1 << 2
  val random = new Random(seed)
  val arrays = Array.fill[MemoryBlock](numArrays) {
    val bytes = new Array[Byte](length)
    random.nextBytes(bytes)
    new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
  }

  val benchmark = new Benchmark("Hash byte arrays with length " + length,
    iters * numArrays, minNumIters = 20)
  benchmark.addCase("HiveHasher") { _: Int =>
    var sum = 0L
    for (_ <- 0L until iters) {
      sum += HiveHasher.hashUnsafeBytesBlock(
        arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
    }
  }
  benchmark.run()
}

test("benchmark UTF8String") {
  val N = 512 * 1024 * 1024
  val iters = 2
  val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
  val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
  val s0 = UTF8String.fromString(str0)
  benchmark.addCase("hashCode") { _: Int =>
    var h: Int = 0
    for (_ <- 0L until iters) { h += s0.hashCode }
  }
  benchmark.addCase("substring") { _: Int =>
    var s: UTF8String = null
    for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
  }
  benchmark.run()
}
```

I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](https://github.com/apache/spark/pull/19222/commits/ee5a79861c18725fb1cd9b518cdfd2489c05b81d6). I got the following results:

```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Memory access benchmarks:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt()              220 /  221        609.3           1.6       1.0X
Platform get/putInt(byte[])                    220 /  236        610.9           1.6       1.0X
Platform get/putInt(Object)                    492 /  494        272.8           3.7       0.4X
OnHeapMemoryBlock get/putLong()                322 /  323        416.5           2.4       0.7X
long[]                                         221 /  221        608.0           1.6       1.0X
Platform get/putLong(long[])                   321 /  321        418.7           2.4       0.7X
Platform get/putLong(Object)                   561 /  563        239.2           4.2       0.4X
```

I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Platform copyMemory:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Object to Object                              1961 / 1967          8.6         116.9       1.0X
System.arraycopy Object to Object             1917 / 1921          8.8         114.3       1.0X
byte array to byte array                      1961 / 1968          8.6         116.9       1.0X
System.arraycopy byte array to byte array      1909 / 1937          8.8         113.8       1.0X
int array to int array                        1921 / 1990          8.7         114.5       1.0X
double array to double array                  1918 / 1923          8.7         114.3       1.0X
Object to byte array                          1961 / 1967          8.6         116.9       1.0X
Object to short array                         1965 / 1972          8.5         117.1       1.0X
Object to int array                           1910 / 1915          8.8         113.9       1.0X
Object to float array                         1971 / 1978          8.5         117.5       1.0X
Object to double array                        1919 / 1944          8.7         114.4       1.0X
byte array to Object                          1959 / 1967          8.6         116.8       1.0X
int array to Object                           1961 / 1970          8.6         116.9       1.0X
double array to Object                        1917 / 1924          8.8         114.3       1.0X
```

These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**

We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.

Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.

## How was this patch tested?

Added `UnsafeMemoryAllocator`

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#19222 from kiszk/SPARK-10399.
robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 7, 2018
… not expected to be supported

## What changes were proposed in this pull request?

This PR excludes an existing UT [`writeToOutputStreamUnderflow()`](https://github.com/apache/spark/blob/master/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java#L519-L532) in `UTF8StringSuite`.

As discussed [here](apache#19222 (comment)), the behavior of this test looks surprising. This test seems to access metadata area of the JVM object where is reserved by `Platform.BYTE_ARRAY_OFFSET`.

This test is introduced thru apache#16089 by NathanHowell. More specifically, [the commit](apache@27c102d) `Improve test coverage of UTFString.write` introduced this UT. However, I cannot find any discussion about this UT.

I think that it would be good to exclude this UT.

```java
  public void writeToOutputStreamUnderflow() throws IOException {
    // offset underflow is apparently supported?
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);

    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
      new UTF8String(
        new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
          .writeTo(outputStream);
      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
      outputStream.reset();
    }
  }
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#20995 from kiszk/SPARK-23882.
robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 7, 2018
## What changes were proposed in this pull request?

This PR fixes the following errors in [Java lint](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/7717/console) after apache#19222 has been merged. These errors were pointed by ueshin .

```
[ERROR] src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java:[57] (sizes) LineLength: Line is longer than 100 characters (found 106).
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[26,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java:[23,10] (modifier) ModifierOrder: 'public' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[64,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[69,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[74,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[79,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[84,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[89,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[94,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[99,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[104,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[109,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[114,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[119,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[124,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[129,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[60,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[65,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[70,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[75,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[80,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[85,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[90,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[95,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[100,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[105,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[110,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[115,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[120,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[125,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java:[114,16] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.memory.MemoryBlock.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[126,15] (naming) MethodName: Method name 'ByteArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[143,15] (naming) MethodName: Method name 'OnHeapMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[160,15] (naming) MethodName: Method name 'OffHeapArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[19,8] (imports) UnusedImports: Unused import - com.google.common.primitives.Ints.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[21,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#20991 from kiszk/SPARK-10399-jlint.
memory = new OffHeapMemoryBlock(address, length);
obj = memory.getBaseObject();
offset = memory.getBaseOffset();
check(memory, obj, offset, length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk We should free the allocated two offheap memory in this test. Can you fix it in your follow-ups?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Sorry, I overlooked this comment. I will address this.

@rxin
Copy link
Contributor

rxin commented Apr 18, 2018

Sorry this thread is too long for me to follow. I might be bringing up a point that has been brought up before.

@kiszk did your perf tests take into account megamorphic callsites? It seems to me from a quick cursory look the benchmark result might not be accurate for real workloads if there are only one implementations of the MemoryBlock loaded.

@kiszk
Copy link
Member Author

kiszk commented Apr 18, 2018

@rxin While I did not perf microbench for megamorphic (up to 3 ByteArrayMemoryBlock, OnHeapMemoryBlock, and OffHeapMemoryBlock) callsites, we confirmed that there is no performance regression in TPC-DS. It seem to be representative regarding real workloads.

I will create and perf microbench for megamorphic (up to 3 ByteArrayMemoryBlock, OnHeapMemoryBlock, and OffHeapMemoryBlock) callsites.

@rxin
Copy link
Contributor

rxin commented Apr 18, 2018

OK thanks please do that. Does TPC-DS even trigger 2 call sites? E.g. ByteArrayMemoryBlock and OnHeapMemoryBlock. Even there it might introduce a conditional branch after JIT that could lead to perf degradation.

I also really worry about off-heap mode, in which all three callsites can exist and lead to massive degradation.

@rxin
Copy link
Contributor

rxin commented Apr 20, 2018

@kiszk do you have more data now?

@kiszk
Copy link
Member Author

kiszk commented Apr 20, 2018

@rxin Sorry, I do not have more data now. I will work for this next week.

I am thinking about the follow type program where memoryBlock.putLong() and memoryBlock.getLong() become megamorphic call sites.
As well-known, while there are several some sub-classes, each call site is usually dominated by one receiver class. I believable that MemoryBlock also works with one dominated receiver class while two or three classes are instantiated. This benchmark tries to capture this behavior.

Are we on the same page?

val onHeapMB = OnHeapMemoryBlock.fromArray(new Array[Long](N))
val offHeapMB = new OffHeapMemoryBlock(address, N)
val byteMB = ByteArrayMemoryBlock.fromArray(new Array[Byte](N*8))

var type: Int = Rand(3)
val memoryBlock: MemoryBlock = if (type == 0) onHeapMB else if (type == 1) byteMB else offHeapMB

benchmark.addCase("MemoryBlock get/putLong()") { _: Int =>
  for (_ <- 0L until iters) {
    var sum = 0L
    var i = 0
    while (i < N) {
      memoryBlock.putLong(i * 8, i)
      i += 1
    }
    i = 0
    while (i < N) {
      sum += memoryBlock.getLong(i * 8)
      i += 1
    }
  }
}

ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 23, 2018
…y in MemoryBlockSuite

## What changes were proposed in this pull request?

As viirya pointed out [here](apache#19222 (comment)), this PR explicitly frees unused off-heap memory in `MemoryBlockSuite`

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#21117 from kiszk/SPARK-10399-free-offheap.
@cloud-fan
Copy link
Contributor

Instead of round-robin the memory block type across iterations, can we just operate on all the memory blocks in each iteration? Then we can remove the if-else and make the benchmark focus more on the memory block?

As a comparison, we can create a byte array, a long array, an offheap array, and also operate on them in each iteration.

@kiszk
Copy link
Member Author

kiszk commented Apr 23, 2018

I think that one memory block in each iteration is more representative with having possibility of megamorphism. This is because in the typicalusages in Spark, a data structure is actually dominated by one of memory types.
For example, UTF8String uses only ByteArrayMemoryBlock while OnHeapMemoryBlock and ByteArrayMemoryBlock are loaded
In the future, I think that we will use only one of three Memoryblocks for UnsafeRow depends on the setting in SparkConf. We will not use OffHeapMemoryBlock for some of UnsafeRow and OnHeapMemoryBlock for the rest of UnsafeRow.

I think that current concern is whether there is performance degradation at possible megamorphic call sites when three MemoryBlock are loaded.

WDYT?

@kiszk
Copy link
Member Author

kiszk commented Apr 25, 2018

@rxin @cloud-fan I am very sorry for preparing performance data since I was busy these weeks.
I confirmed that primitive operations in MemoryBlock approach is faster for OnHeap and is equal to for OffHeap compared to them in Platform approach.

In the table, Platform getInt(Object byte[]) is slower than other *MemoryBlock.getInt(). Platform getInt(Object offHeap) is equal to OffHeapMemoryBlock get().
Platform putInt(Object byte[]) is slower than other *MemoryBlock.putInt(). Platform putInt(Object offHeap) is equal to OffHeapMemoryBlock putInt().

I ran this benchmark program that includes potentially polymorphic call for MemoryBlock at mb.getInt() like

    def getMB(g: Int): MemoryBlock = {
      if (g < mIters) {
        byteArrayMB
      } else if (g < mIters * 2) {
        onHeapMB
      } else if (g < mIters * 3) {
        offHeapMB
        ...
      } else {
        null
      }
    }

    benchmark.addCase("ByteArrayMemoryBlock getInt()") { t: Int =>
      val mb = getMB(g)
      if (t >= 0) g += 1
      var sum = 0
      for (_ <- 0L until iters) {
        var i = 0
        while (i < N / 4) {
          sum += mb.getInt(i * 4)
          i += 1
        }
      }
    }

OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Memory access benchmarks:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Platform getInt(Object byte[])                1348 / 1355        199.1           5.0       1.0X
ByteArrayMemoryBlock getInt()                  375 /  375        716.6           1.4       3.6X
OnHeapMemoryBlock getInt()                     374 /  376        717.1           1.4       3.6X
Platform getInt(Object offHeap)                327 /  328        820.0           1.2       4.1X
OffHeapMemoryBlock get()                       325 /  403        827.1           1.2       4.2X
Platform putInt(Object byte[])                 848 /  852        316.5           3.2       1.6X
ByteArrayMemoryBlock putInt()                  571 /  571        470.4           2.1       2.4X
OnHeapMemoryBlock putInt()                     575 /  575        467.2           2.1       2.3X
Platform putInt(Object offHeap)                582 /  584        461.0           2.2       2.3X
OffHeapMemoryBlock putInt()                    583 /  584        460.1           2.2       2.3X

WDYT?

@kiszk
Copy link
Member Author

kiszk commented Apr 30, 2018

@rxin What do you think about this data? Could you let us know if you still have any question?

@kiszk
Copy link
Member Author

kiszk commented Jun 13, 2018

ping @rednaxelafx

4 similar comments
@kiszk
Copy link
Member Author

kiszk commented Jul 6, 2018

ping @rednaxelafx

@kiszk
Copy link
Member Author

kiszk commented Jul 22, 2018

ping @rednaxelafx

@kiszk
Copy link
Member Author

kiszk commented Aug 6, 2018

ping @rednaxelafx

@kiszk
Copy link
Member Author

kiszk commented Aug 14, 2018

ping @rednaxelafx

asfgit pushed a commit that referenced this pull request Sep 9, 2018
## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR #22338, the performance regression still exists. If we revert the changes in #19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0b9ccd5)
Signed-off-by: Wenchen Fan <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 9, 2018
## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR #22338, the performance regression still exists. If we revert the changes in #19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants