From 73f9e9bfba56cb9c907a11e7159549e3cd4bdfb2 Mon Sep 17 00:00:00 2001 From: Shawn Yang Date: Sat, 27 Apr 2024 12:15:04 +0800 Subject: [PATCH] fix(java): grow buffer to interger.max (#1582) ## What does this PR do? grow buffer to interger.max ## Related issues #1576 ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark --- .../org/apache/fury/io/FuryInputStream.java | 9 ++++----- .../apache/fury/io/FuryReadableChannel.java | 4 +++- .../org/apache/fury/io/FuryStreamReader.java | 1 - .../org/apache/fury/memory/MemoryBuffer.java | 20 +++++++++++++++---- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java index 7096bd2186..3dce088aa0 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryInputStream.java @@ -77,11 +77,10 @@ public int fillBuffer(int minFillSize) { private static byte[] growBuffer(int minFillSize, MemoryBuffer buffer) { int newSize; int targetSize = buffer.size() + minFillSize; - if (targetSize < BUFFER_GROW_STEP_THRESHOLD) { - newSize = targetSize << 2; - } else { - newSize = (int) (targetSize * 1.5); - } + newSize = + targetSize < MemoryBuffer.BUFFER_GROW_STEP_THRESHOLD + ? targetSize << 2 + : (int) Math.min(targetSize * 1.5d, Integer.MAX_VALUE); byte[] newBuffer = new byte[newSize]; byte[] heapMemory = buffer.getHeapMemory(); System.arraycopy(heapMemory, 0, newBuffer, 0, buffer.size()); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java index 293620b706..245a34b459 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java @@ -56,7 +56,9 @@ public int fillBuffer(int minFillSize) { int newLimit = position + minFillSize; if (newLimit > byteBuf.capacity()) { int newSize = - newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); + newLimit < MemoryBuffer.BUFFER_GROW_STEP_THRESHOLD + ? newLimit << 2 + : (int) Math.min(newLimit * 1.5d, Integer.MAX_VALUE); ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); byteBuf.position(0); newByteBuf.put(byteBuf); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java b/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java index e02d4efbca..4d4c6d5ee4 100644 --- a/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java +++ b/java/fury-core/src/main/java/org/apache/fury/io/FuryStreamReader.java @@ -26,7 +26,6 @@ /** A streaming reader to make {@link MemoryBuffer} to support streaming reading. */ public interface FuryStreamReader { - int BUFFER_GROW_STEP_THRESHOLD = 100 * 1024 * 1024; /** * Read stream and fill the data to underlying {@link MemoryBuffer}, which is also the buffer diff --git a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java index f5b1eea48d..d57009eb54 100644 --- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java +++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java @@ -60,6 +60,7 @@ * DesiredMethodLimit,MaxRecursiveInlineLevel,FreqInlineSize,MaxInlineSize */ public final class MemoryBuffer { + public static final int BUFFER_GROW_STEP_THRESHOLD = 100 * 1024 * 1024; private static final Unsafe UNSAFE = Platform.UNSAFE; private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); @@ -1204,18 +1205,29 @@ public void writePrimitiveArray(Object arr, int offset, int numBytes) { /** For off-heap buffer, this will make a heap buffer internally. */ public void grow(int neededSize) { - ensure(writerIndex + neededSize); + int length = writerIndex + neededSize; + if (length > size) { + growBuffer(length); + } } /** For off-heap buffer, this will make a heap buffer internally. */ public void ensure(int length) { if (length > size) { - byte[] data = new byte[length * 2]; - copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); - initHeapBuffer(data, 0, data.length); + growBuffer(length); } } + private void growBuffer(int length) { + int newSize = + length < BUFFER_GROW_STEP_THRESHOLD + ? length << 2 + : (int) Math.min(length * 1.5d, Integer.MAX_VALUE); + byte[] data = new byte[newSize]; + copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size()); + initHeapBuffer(data, 0, data.length); + } + // ------------------------------------------------------------------------- // Read Methods // -------------------------------------------------------------------------