From 5e04e6619bd79bd0bb0138aeb82a26fd0f556910 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Sat, 27 Apr 2024 11:41:20 +0800 Subject: [PATCH 1/5] grow buffer to interger.max --- .../src/main/java/org/apache/fury/io/FuryInputStream.java | 7 ++----- .../main/java/org/apache/fury/io/FuryReadableChannel.java | 4 ++-- .../src/main/java/org/apache/fury/memory/MemoryBuffer.java | 3 ++- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java index 7096bd2186..77d32787b0 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java @@ -77,11 +77,8 @@ public int fillBuffer(int minFillSize) { private static byte[] growBuffer(int minFillSize, MemoryBuffer buffer) { int newSize; int targetSize = buffer.size() + minFillSize; - if (targetSize < BUFFER_GROW_STEP_THRESHOLD) { - newSize = targetSize << 2; - } else { - newSize = (int) (targetSize * 1.5); - } + newSize = targetSize < BUFFER_GROW_STEP_THRESHOLD ? targetSize << 2 : + (int) Math.min(targetSize * 1.5d, Integer.MAX_VALUE); byte[] newBuffer = new byte[newSize]; byte[] heapMemory = buffer.getHeapMemory(); System.arraycopy(heapMemory, 0, newBuffer, 0, buffer.size()); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java index 293620b706..876d99e667 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java @@ -55,8 +55,8 @@ public int fillBuffer(int minFillSize) { int position = byteBuf.position(); int newLimit = position + minFillSize; if (newLimit > byteBuf.capacity()) { - int newSize = - newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); + int newSize = newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : + (int) Math.min(newLimit * 1.5d, Integer.MAX_VALUE); ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); byteBuf.position(0); newByteBuf.put(byteBuf); diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index f5b1eea48d..1982a36f11 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -1210,7 +1210,8 @@ public void grow(int neededSize) { /** For off-heap buffer, this will make a heap buffer internally. */ public void ensure(int length) { if (length > size) { - byte[] data = new byte[length * 2]; + int newSize = length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + byte[] data = new byte[newSize]; copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); initHeapBuffer(data, 0, data.length); } From 62d3ab950cafda7783c3da34f0c84699fb5f6c71 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Sat, 27 Apr 2024 11:43:51 +0800 Subject: [PATCH 2/5] lint code --- .../src/main/java/org/apache/fury/io/FuryInputStream.java | 6 ++++-- .../main/java/org/apache/fury/io/FuryReadableChannel.java | 6 ++++-- .../src/main/java/org/apache/fury/memory/MemoryBuffer.java | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java index 77d32787b0..48916fbe30 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java @@ -77,8 +77,10 @@ public int fillBuffer(int minFillSize) { private static byte[] growBuffer(int minFillSize, MemoryBuffer buffer) { int newSize; int targetSize = buffer.size() + minFillSize; - newSize = targetSize < BUFFER_GROW_STEP_THRESHOLD ? targetSize << 2 : - (int) Math.min(targetSize * 1.5d, Integer.MAX_VALUE); + newSize = + targetSize < BUFFER_GROW_STEP_THRESHOLD + ? targetSize << 2 + : (int) Math.min(targetSize * 1.5d, Integer.MAX_VALUE); byte[] newBuffer = new byte[newSize]; byte[] heapMemory = buffer.getHeapMemory(); System.arraycopy(heapMemory, 0, newBuffer, 0, buffer.size()); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java index 876d99e667..65b35f9eab 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java @@ -55,8 +55,10 @@ public int fillBuffer(int minFillSize) { int position = byteBuf.position(); int newLimit = position + minFillSize; if (newLimit > byteBuf.capacity()) { - int newSize = newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : - (int) Math.min(newLimit * 1.5d, Integer.MAX_VALUE); + int newSize = + newLimit < BUFFER_GROW_STEP_THRESHOLD + ? newLimit << 2 + : (int) Math.min(newLimit * 1.5d, Integer.MAX_VALUE); ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); byteBuf.position(0); newByteBuf.put(byteBuf); diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index 1982a36f11..6c55fe5231 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -1210,7 +1210,8 @@ public void grow(int neededSize) { /** For off-heap buffer, this will make a heap buffer internally. */ public void ensure(int length) { if (length > size) { - int newSize = length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + int newSize = + length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); byte[] data = new byte[newSize]; copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); initHeapBuffer(data, 0, data.length); From 90e2cb3af3a8761fad300c644f102582f1b8d0d8 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Sat, 27 Apr 2024 11:46:23 +0800 Subject: [PATCH 3/5] make growBuffer in a separate method --- .../org/apache/fury/memory/MemoryBuffer.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index 6c55fe5231..237fce13b9 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -1204,20 +1204,27 @@ public void writePrimitiveArray(Object arr, int offset, int numBytes) { /** For off-heap buffer, this will make a heap buffer internally. */ public void grow(int neededSize) { - ensure(writerIndex + neededSize); + int length = writerIndex + neededSize; + if (length > size) { + growBuffer(length); + } } /** For off-heap buffer, this will make a heap buffer internally. */ public void ensure(int length) { if (length > size) { - int newSize = - length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); - byte[] data = new byte[newSize]; - copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); - initHeapBuffer(data, 0, data.length); + growBuffer(length); } } + private void growBuffer(int length) { + int newSize = + length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + byte[] data = new byte[newSize]; + copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); + initHeapBuffer(data, 0, data.length); + } + // ------------------------------------------------------------------------- // Read Methods // ------------------------------------------------------------------------- From b1cbf81f673c0d9a5910575bbd44a45998126502 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Sat, 27 Apr 2024 11:55:44 +0800 Subject: [PATCH 4/5] lint code --- .../src/main/java/org/apache/fury/memory/MemoryBuffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index 237fce13b9..8adfc7d4b8 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -1219,7 +1219,7 @@ public void ensure(int length) { private void growBuffer(int length) { int newSize = - length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); byte[] data = new byte[newSize]; copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); initHeapBuffer(data, 0, data.length); From 2499a295de7f356727d503fc6c7dc7e87841e3c8 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Sat, 27 Apr 2024 12:09:47 +0800 Subject: [PATCH 5/5] reuse buffer threshold --- .../src/main/java/org/apache/fury/io/FuryInputStream.java | 2 +- .../main/java/org/apache/fury/io/FuryReadableChannel.java | 2 +- .../src/main/java/org/apache/fury/io/FuryStreamReader.java | 1 - .../src/main/java/org/apache/fury/memory/MemoryBuffer.java | 5 ++++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java index 48916fbe30..3dce088aa0 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java @@ -78,7 +78,7 @@ private static byte[] growBuffer(int minFillSize, MemoryBuffer buffer) { int newSize; int targetSize = buffer.size() + minFillSize; newSize = - targetSize < BUFFER_GROW_STEP_THRESHOLD + targetSize < MemoryBuffer.BUFFER_GROW_STEP_THRESHOLD ? targetSize << 2 : (int) Math.min(targetSize * 1.5d, Integer.MAX_VALUE); byte[] newBuffer = new byte[newSize]; diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java index 65b35f9eab..245a34b459 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java @@ -56,7 +56,7 @@ public int fillBuffer(int minFillSize) { int newLimit = position + minFillSize; if (newLimit > byteBuf.capacity()) { int newSize = - newLimit < BUFFER_GROW_STEP_THRESHOLD + newLimit < MemoryBuffer.BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) Math.min(newLimit * 1.5d, Integer.MAX_VALUE); ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java index e02d4efbca..4d4c6d5ee4 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java @@ -26,7 +26,6 @@ /** A streaming reader to make {@link MemoryBuffer} to support streaming reading. */ public interface FuryStreamReader { - int BUFFER_GROW_STEP_THRESHOLD = 100 * 1024 * 1024; /** * Read stream and fill the data to underlying {@link MemoryBuffer}, which is also the buffer diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index 8adfc7d4b8..d57009eb54 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -60,6 +60,7 @@ * DesiredMethodLimit,MaxRecursiveInlineLevel,FreqInlineSize,MaxInlineSize */ public final class MemoryBuffer { + public static final int BUFFER_GROW_STEP_THRESHOLD = 100 * 1024 * 1024; private static final Unsafe UNSAFE = Platform.UNSAFE; private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); @@ -1219,7 +1220,9 @@ public void ensure(int length) { private void growBuffer(int length) { int newSize = - length < 104857600 ? length << 2 : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + length < BUFFER_GROW_STEP_THRESHOLD + ? length << 2 + : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); byte[] data = new byte[newSize]; copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); initHeapBuffer(data, 0, data.length);