From 63f4455ca498ccdb3c92b3b8d70527febf9c7776 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 1 Sep 2021 13:00:48 +0800 Subject: [PATCH] Add AutoBufferLedger (#31) --- .../jni/DirectReservationListener.java | 42 +- .../jni/UnsafeRecordBatchSerializer.java | 33 +- .../arrow/memory/NativeUnderlyingMemory.java | 4 +- .../arrow/memory/AllocationManager.java | 2 +- .../org/apache/arrow/memory/ArrowBuf.java | 67 ++- .../apache/arrow/memory/AutoBufferLedger.java | 206 +++++++++ .../apache/arrow/memory/BaseAllocator.java | 18 +- .../arrow/memory/BaseReferenceManager.java | 211 +++++++++ .../apache/arrow/memory/BufferAllocator.java | 8 + .../org/apache/arrow/memory/BufferLedger.java | 409 +++++------------- .../memory/DirectAllocationListener.java | 38 ++ .../arrow/memory/LegacyBufferLedger.java | 168 +++++++ .../arrow/memory/ReferenceCountAware.java | 57 +++ .../apache/arrow/memory/ReferenceManager.java | 38 +- .../apache/arrow/memory/util/MemoryUtil.java | 94 ++++ .../arrow/memory/TestAutoBufferLedger.java | 189 ++++++++ 16 files changed, 1176 insertions(+), 408 deletions(-) create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java create mode 100644 java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java index 72a1cadcf69b9..57ea5f19a7e02 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java @@ -21,6 +21,7 @@ import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicLong; +import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.VisibleForTesting; /** @@ -29,20 +30,6 @@ * "-XX:MaxDirectMemorySize". */ public class DirectReservationListener implements ReservationListener { - private final Method methodReserve; - private final Method methodUnreserve; - - private DirectReservationListener() { - try { - final Class classBits = Class.forName("java.nio.Bits"); - methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class); - methodReserve.setAccessible(true); - methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class); - methodUnreserve.setAccessible(true); - } catch (Exception e) { - throw new RuntimeException(e); - } - } private static final DirectReservationListener INSTANCE = new DirectReservationListener(); @@ -55,14 +42,7 @@ public static DirectReservationListener instance() { */ @Override public void reserve(long size) { - try { - if (size > Integer.MAX_VALUE) { - throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); - } - methodReserve.invoke(null, (int) size, (int) size); - } catch (Exception e) { - throw new RuntimeException(e); - } + MemoryUtil.reserveDirectMemory(size); } /** @@ -70,14 +50,7 @@ public void reserve(long size) { */ @Override public void unreserve(long size) { - try { - if (size > Integer.MAX_VALUE) { - throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); - } - methodUnreserve.invoke(null, (int) size, (int) size); - } catch (Exception e) { - throw new RuntimeException(e); - } + MemoryUtil.unreserveDirectMemory(size); } /** @@ -85,13 +58,6 @@ public void unreserve(long size) { */ @VisibleForTesting public long getCurrentDirectMemReservation() { - try { - final Class classBits = Class.forName("java.nio.Bits"); - final Field f = classBits.getDeclaredField("reservedMemory"); - f.setAccessible(true); - return ((AtomicLong) f.get(null)).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } + return MemoryUtil.getCurrentDirectMemReservation(); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java index 11402738fbc1a..e1b6766f6ad60 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java @@ -35,10 +35,7 @@ import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.MessageHeader; import org.apache.arrow.flatbuf.RecordBatch; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.BufferLedger; -import org.apache.arrow.memory.NativeUnderlyingMemory; +import org.apache.arrow.memory.*; import org.apache.arrow.memory.util.LargeMemoryUtil; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.compression.NoCompressionCodec; @@ -122,19 +119,19 @@ public static ArrowRecordBatch deserializeUnsafe( throw new IllegalArgumentException("Buffer count mismatch between metadata and native managed refs"); } - final ArrayList buffers = new ArrayList<>(); - for (int i = 0; i < batchMeta.buffersLength(); i++) { - final Buffer bufferMeta = batchMeta.buffers(i); - final KeyValue keyValue = metaMessage.customMetadata(i); // custom metadata containing native buffer refs - final byte[] refDecoded = Base64.getDecoder().decode(keyValue.value()); - final long nativeBufferRef = ByteBuffer.wrap(refDecoded).order(ByteOrder.LITTLE_ENDIAN).getLong(); - final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length()); - final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator, - size, nativeBufferRef, bufferMeta.offset()); - BufferLedger ledger = am.associate(allocator); - ArrowBuf buf = new ArrowBuf(ledger, null, size, bufferMeta.offset()); - buffers.add(buf); - } + final ArrayList buffers = new ArrayList<>(); + for (int i = 0; i < batchMeta.buffersLength(); i++) { + final Buffer bufferMeta = batchMeta.buffers(i); + final KeyValue keyValue = metaMessage.customMetadata(i); // custom metadata containing native buffer refs + final byte[] refDecoded = Base64.getDecoder().decode(keyValue.value()); + final long nativeBufferRef = ByteBuffer.wrap(refDecoded).order(ByteOrder.LITTLE_ENDIAN).getLong(); + final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length()); + final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator, + size, nativeBufferRef, bufferMeta.offset()); + ReferenceManager rm = am.createReferenceManager(allocator); + ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset()); + buffers.add(buf); + } try { final int numRows = LargeMemoryUtil.checkedCastToInt(batchMeta.length()); @@ -263,4 +260,4 @@ public void close() throws Exception { delegated.close(); } } -} \ No newline at end of file +} diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java index 21f09a64e700d..61cf3072203de 100644 --- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java +++ b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java @@ -60,8 +60,8 @@ public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address); } - public BufferLedger associate(BufferAllocator allocator) { - return super.associate(allocator); + public ReferenceManager createReferenceManager(BufferAllocator allocator) { + return super.associate(allocator).newReferenceManager(); } @Override diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java index 5f8ab12446ad4..4bba7947ff8bd 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -106,7 +106,7 @@ private BufferLedger associate(final BufferAllocator allocator, final boolean re return ledger; } - ledger = new BufferLedger(allocator, this); + ledger = allocator.getBufferLedgerFactory().create(allocator, this); if (retain) { // the new reference manager will have a ref count of 1 diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java index d7827073ea24c..43a271464a5d6 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java @@ -1094,6 +1094,15 @@ public long getId() { return id; } + /** + * Create a logger of this {@link ArrowBuf}. + * + * @return the newly created logger + */ + Logger createLogger() { + return new Logger(id, memoryAddress(), length, historicalLog); + } + /** * Prints information of this buffer into sb at the given * indentation and verbosity level. @@ -1103,12 +1112,7 @@ public long getId() { * */ public void print(StringBuilder sb, int indent, Verbosity verbosity) { - CommonUtil.indent(sb, indent).append(toString()); - - if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) { - sb.append("\n"); - historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces); - } + new Logger(id, addr, length, historicalLog).print(sb, indent, verbosity); } /** @@ -1199,4 +1203,55 @@ public ArrowBuf clear() { this.readerIndex = this.writerIndex = 0; return this; } + + /** + * Initialize the reader and writer index. + * @param readerIndex index to read from + * @param writerIndex index to write to + * @return this + */ + @Deprecated + public ArrowBuf setIndex(int readerIndex, int writerIndex) { + if (readerIndex >= 0 && readerIndex <= writerIndex && writerIndex <= this.capacity()) { + this.readerIndex = readerIndex; + this.writerIndex = writerIndex; + return this; + } else { + throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d " + + "(expected:0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, this.capacity())); + } + } + + /** + * Create a logger for an {@link ArrowBuf}. This is currently used in debugging or historical logging + * in code of {@link BufferLedger} to avoid directly holding a strong reference to {@link ArrowBuf}. + * So that GC could be able to involved in auto cleaning logic in {@link AutoBufferLedger}. + */ + static class Logger { + private final long id; + private final long addr; + private final long length; + private final HistoricalLog historicalLog; + + public Logger(long id, long addr, long length, HistoricalLog historicalLog) { + this.id = id; + this.addr = addr; + this.length = length; + this.historicalLog = historicalLog; + } + + public void print(StringBuilder sb, int indent, Verbosity verbosity) { + CommonUtil.indent(sb, indent).append(toString()); + + if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) { + sb.append("\n"); + historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces); + } + } + + @Override + public String toString() { + return String.format("ArrowBuf.Logger[%d], address:%d, length:%d", id, addr, length); + } + } } diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java new file mode 100644 index 0000000000000..cc61cb90cbe80 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import sun.misc.Cleaner; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An alternative implementation of {@link BufferLedger}. The reference is auto managed by JVM garbage collector + * comparing to {@link LegacyBufferLedger}. Explicit calls to reference management methods such as + * {@link #retain()} and {@link #release()} will be ignored. + * + *

+ * Note when this implementation, the accurate release time of the underlying {@link AllocationManager} may become + * unpredictable because we are relying GC to do clean-up. As a result, it's recommended to specify very large + * allocation limit (e.g. {@link Integer#MAX_VALUE}) to the corresponding {@link BufferAllocator} to avoid + * unexpected allocation failures. + *

+ * + *

+ * Also, to let the GC be aware of these allocations when off-heap based + * {@link AllocationManager}s are used, it's required to also add the allocated sizes to JVM direct + * memory counter (which can be limited by specifying JVM option "-XX:MaxDirectMemorySize"). To + * achieve this one can simply set allocator's {@link AllocationListener} to + * {@link DirectAllocationListener}. + * JVM should ensure that garbage collection will be performed once total reservation reached the limit. + *

+ */ +public class AutoBufferLedger extends BufferLedger { + + public static class Factory implements BufferLedger.Factory, AutoCloseable { + private AutoBufferLedger tail = null; + + @Override + public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) { + return new AutoBufferLedger(allocator, allocationManager, this); + } + + private void link(AutoBufferLedger ledger) { + synchronized (this) { + if (ledger.next != null || ledger.prev != null) { + throw new IllegalStateException("already linked"); + } + if (tail == null) { + tail = ledger; + return; + } + tail.next = ledger; + ledger.prev = tail; + tail = ledger; + } + } + + private void unlink(AutoBufferLedger ledger) { + synchronized (this) { + if (ledger.next == ledger) { + return; + } + if (ledger.prev == ledger) { + throw new IllegalStateException(); + } + if (ledger == tail) { + tail = ledger.prev; + } + if (ledger.prev != null) { + ledger.prev.next = ledger.next; + } + if (ledger.next != null) { + ledger.next.prev = ledger.prev; + } + ledger.prev = ledger; + ledger.next = ledger; + } + } + + @Override + public void close() { + synchronized (this) { + while (tail != null) { + final AutoBufferLedger tmp = tail.prev; + tail.destruct(); + tail = tmp; + } + } + } + } + + public static Factory newFactory() { + return new Factory(); + } + + private volatile long lDestructionTime = 0; + private final AtomicInteger refCount = new AtomicInteger(0); + private final AtomicBoolean destructed = new AtomicBoolean(false); + private final Factory factory; + + private AutoBufferLedger prev = null; + private AutoBufferLedger next = null; + + AutoBufferLedger(BufferAllocator allocator, AllocationManager allocationManager, + Factory factory) { + super(allocator, allocationManager); + this.factory = factory; + factory.link(this); + } + + @Override + protected long getDestructionTime() { + return lDestructionTime; + } + + @Override + protected ReferenceManager newReferenceManager() { + reserve0(); + final ReferenceManager rm = new BaseReferenceManager(this); + Cleaner.create(rm, new LedgerDeallocator()); + return rm; + } + + @Override + public int getRefCount() { + return refCount.get(); + } + + @Override + protected void increment() { + // no-op + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public void retain() { + + } + + @Override + public void retain(int increment) { + + } + + private void reserve0() { + if (refCount.getAndAdd(1) == 0) { + // no-op + } + } + + private void release0() { + if (refCount.addAndGet(-1) == 0) { + destruct(); + } + } + + private void destruct() { + if (!destructed.compareAndSet(false, true)) { + return; + } + synchronized (getAllocationManager()) { + final AllocationManager am = getAllocationManager(); + lDestructionTime = System.nanoTime(); + am.release(this); + } + factory.unlink(this); + } + + /** + * Release hook will be invoked by JVM cleaner. + * + * @see #newReferenceManager() + */ + private class LedgerDeallocator implements Runnable { + + private LedgerDeallocator() { + } + + @Override + public void run() { + release0(); + } + } +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 8d21cef7aa382..9af1cdf3e0255 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -74,6 +74,7 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator { private final HistoricalLog historicalLog; private final RoundingPolicy roundingPolicy; private final AllocationManager.Factory allocationManagerFactory; + private final BufferLedger.Factory bufferLedgerFactory; private volatile boolean isClosed = false; // the allocator has been closed @@ -94,6 +95,7 @@ protected BaseAllocator( this.listener = config.getListener(); this.allocationManagerFactory = config.getAllocationManagerFactory(); + this.bufferLedgerFactory = config.getBufferLedgerFactory(); if (parentAllocator != null) { this.root = parentAllocator.root; @@ -141,6 +143,11 @@ public Collection getChildAllocators() { } } + @Override + public BufferLedger.Factory getBufferLedgerFactory() { + return bufferLedgerFactory; + } + private static String createErrorMsg(final BufferAllocator allocator, final long rounded, final long requested) { if (rounded != requested) { return String.format( @@ -343,6 +350,7 @@ public BufferAllocator newChildAllocator( .maxAllocation(maxAllocation) .roundingPolicy(roundingPolicy) .allocationManagerFactory(allocationManagerFactory) + .bufferLedgerFactory(bufferLedgerFactory) .build()); if (DEBUG) { @@ -724,7 +732,7 @@ public RoundingPolicy getRoundingPolicy() { * Config class of {@link BaseAllocator}. */ @Value.Immutable - abstract static class Config { + public abstract static class Config { /** * Factory for creating {@link AllocationManager} instances. */ @@ -733,6 +741,14 @@ AllocationManager.Factory getAllocationManagerFactory() { return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory(); } + /** + * Factory for creating {@link BufferLedger} instances. + */ + @Value.Default + BufferLedger.Factory getBufferLedgerFactory() { + return LegacyBufferLedger.FACTORY; + } + /** * Listener callback. Must be non-null. */ diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java new file mode 100644 index 0000000000000..be72223ce8610 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * Standard implementation of {@link ReferenceManager} backed by a + * {@link BufferLedger}. + */ +public class BaseReferenceManager implements ReferenceManager { + private final BufferLedger ledger; + private final BufferAllocator allocator; + private final AllocationManager allocationManager; + + public BaseReferenceManager(BufferLedger ledger) { + this.ledger = ledger; + this.allocator = ledger.getAllocator(); + this.allocationManager = ledger.getAllocationManager(); + } + + @Override + public int getRefCount() { + return ledger.getRefCount(); + } + + @Override + public boolean release() { + return ledger.release(); + } + + @Override + public boolean release(int decrement) { + return ledger.release(decrement); + } + + @Override + public void retain() { + ledger.retain(); + } + + @Override + public void retain(int increment) { + ledger.retain(increment); + } + + /** + * Derive a new ArrowBuf from a given source ArrowBuf. The new derived + * ArrowBuf will share the same reference count as rest of the ArrowBufs + * associated with this ledger. This operation is typically used for + * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at + * a particular index in the underlying memory and having access to a + * particular length (in bytes) of data in memory chunk. + *

+ * This method is also used as a helper for transferring ownership and retain to target + * allocator. + *

+ * @param sourceBuffer source ArrowBuf + * @param index index (relative to source ArrowBuf) new ArrowBuf should be + * derived from + * @param length length (bytes) of data in underlying memory that derived buffer will + * have access to in underlying memory + * @return derived buffer + */ + @Override + public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) { + return ledger.deriveBuffer(sourceBuffer, index, length); + } + + /** + * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of + * memory ownership and accounting. This has no impact on the reference counting for the current + * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer. + *

+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this + * memory is being associated with the target allocator or in other words allocation manager currently + * doesn't hold a mapping for the target allocator) or the current value of the reference count for + * the target allocator-reference manager combination + 1 in the case that the provided allocator + * already had an association to this underlying memory. + *

+ * + * @param srcBuffer source ArrowBuf + * @param target The target allocator to create an association with. + * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf. + */ + @Override + public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { + + if (BaseAllocator.DEBUG) { + ledger.logEvent("retain(%s)", target.getName()); + } + + // the call to associate will return the corresponding reference manager (buffer ledger) for + // the target allocator. if the allocation manager didn't already have a mapping + // for the target allocator, it will create one and return the new reference manager with a + // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1. + // alternatively, if there was already a mapping for in + // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 + // and this will be true for all the existing buffers currently managed by targetrefmanager + final BufferLedger targetRefManager = allocationManager.associate(target); + // create a new ArrowBuf to associate with new allocator and target ref manager + final long targetBufLength = srcBuffer.capacity(); + ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); + targetArrowBuf.readerIndex(srcBuffer.readerIndex()); + targetArrowBuf.writerIndex(srcBuffer.writerIndex()); + return targetArrowBuf; + } + + /** + * Transfer the memory accounting ownership of this ArrowBuf to another allocator. + * This will generate a new ArrowBuf that carries an association with the underlying memory + * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory, + * that memory ownership/accounting will be transferred to the target allocator. If this + * ArrowBuf does not currently own the memory underlying it (and is only associated with it), + * this does not transfer any ownership to the newly created ArrowBuf. + *

+ * This operation has no impact on the reference count of this ArrowBuf. The newly created + * ArrowBuf with either have a reference count of 1 (in the case that this is the first time + * this memory is being associated with the new allocator) or the current value of the reference + * count for the other AllocationManager/BufferLedger combination + 1 in the case that the provided + * allocator already had an association to this underlying memory. + *

+ *

+ * Transfers will always succeed, even if that puts the other allocator into an overlimit + * situation. This is possible due to the fact that the original owning allocator may have + * allocated this memory out of a local reservation whereas the target allocator may need to + * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless + * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely + * to an actual overlimit==true condition. This is simply conservative behavior which means we may + * return overlimit slightly sooner than is necessary. + *

+ * + * @param target The allocator to transfer ownership to. + * @return A new transfer result with the impact of the transfer (whether it was overlimit) as + * well as the newly created ArrowBuf. + */ + @Override + public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) { + // the call to associate will return the corresponding reference manager (buffer ledger) for + // the target allocator. if the allocation manager didn't already have a mapping + // for the target allocator, it will create one and return the new reference manager with a + // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1. + // alternatively, if there was already a mapping for in + // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 + // and this will be true for all the existing buffers currently managed by targetrefmanager + final BufferLedger targetLedger = allocationManager.associate(target); + // create a new ArrowBuf to associate with new allocator and target ref manager + final long targetBufLength = srcBuffer.capacity(); + final ArrowBuf targetArrowBuf = targetLedger.deriveBuffer(srcBuffer, 0, targetBufLength); + targetArrowBuf.readerIndex(srcBuffer.readerIndex()); + targetArrowBuf.writerIndex(srcBuffer.writerIndex()); + final boolean allocationFit = ledger.transferBalance(targetLedger); + return new TransferResult(allocationFit, targetArrowBuf); + } + + @Override + public BufferAllocator getAllocator() { + return ledger.getAllocator(); + } + + @Override + public long getSize() { + return ledger.getSize(); + } + + @Override + public long getAccountedSize() { + return ledger.getAccountedSize(); + } + + /** + * The outcome of a Transfer. + */ + public static class TransferResult implements OwnershipTransferResult { + + // Whether this transfer fit within the target allocator's capacity. + final boolean allocationFit; + + // The newly created buffer associated with the target allocator + public final ArrowBuf buffer; + + private TransferResult(boolean allocationFit, ArrowBuf buffer) { + this.allocationFit = allocationFit; + this.buffer = buffer; + } + + @Override + public ArrowBuf getTransferredBuffer() { + return buffer; + } + + @Override + public boolean getAllocationFit() { + return allocationFit; + } + } +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java index e59349c6498ce..f36f9662341c6 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -175,6 +175,14 @@ BufferAllocator newChildAllocator( */ Collection getChildAllocators(); + + /** + * Returns {@link BufferLedger.Factory} used by this allocator. + * + * @return the buffer ledger factory + */ + BufferLedger.Factory getBufferLedgerFactory(); + /** * Create an allocation reservation. A reservation is a way of building up * a request for a buffer whose size is not known in advance. See diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java index 48b3e183d5ae0..42bd959c2554e 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -18,7 +18,6 @@ package org.apache.arrow.memory; import java.util.IdentityHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.arrow.memory.util.CommonUtil; @@ -31,13 +30,12 @@ * ArrowBufs managed by this reference manager share a common * fate (same reference count). */ -public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager { - private final IdentityHashMap buffers = +public abstract class BufferLedger implements ValueWithKeyIncluded, ReferenceCountAware { + private final IdentityHashMap buffers = BaseAllocator.DEBUG ? new IdentityHashMap<>() : null; private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); // unique ID assigned to each ledger private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); - private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can // manage request for retain // correctly private final long lCreationTime = System.nanoTime(); @@ -46,165 +44,73 @@ public class BufferLedger implements ValueWithKeyIncluded, Refe private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; - private volatile long lDestructionTime = 0; BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) { this.allocator = allocator; this.allocationManager = allocationManager; } - boolean isOwningLedger() { - return this == allocationManager.getOwningLedger(); - } - - public BufferAllocator getKey() { - return allocator; - } - - /** - * Get the buffer allocator associated with this reference manager. - * @return buffer allocator - */ - @Override - public BufferAllocator getAllocator() { - return allocator; - } - - /** - * Get this ledger's reference count. - * @return reference count - */ - @Override - public int getRefCount() { - return bufRefCnt.get(); - } - /** * Increment the ledger's reference count for the associated * underlying memory chunk. All ArrowBufs managed by this ledger * will share the ref count. */ - void increment() { - bufRefCnt.incrementAndGet(); - } + protected abstract void increment(); + /** - * Decrement the ledger's reference count by 1 for the associated underlying - * memory chunk. If the reference count drops to 0, it implies that - * no ArrowBufs managed by this reference manager need access to the memory - * chunk. In that case, the ledger should inform the allocation manager - * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will - * decide since tracks the usage of memory chunk across multiple reference - * managers and allocators. - * @return true if the new ref count has dropped to 0, false otherwise + * Get destruction time of this buffer ledger. + * + * @return destruction time in nano, 0 if the ledger is not destructed yet */ - @Override - public boolean release() { - return release(1); - } + protected abstract long getDestructionTime(); + /** - * Decrement the ledger's reference count for the associated underlying - * memory chunk. If the reference count drops to 0, it implies that - * no ArrowBufs managed by this reference manager need access to the memory - * chunk. In that case, the ledger should inform the allocation manager - * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will - * decide since tracks the usage of memory chunk across multiple reference - * managers and allocators. - * @param decrement amount to decrease the reference count by - * @return true if the new ref count has dropped to 0, false otherwise + * Create new instance of {@link ReferenceManager} using this ledger. + * + * @return the newly created instance of {@link ReferenceManager} */ - @Override - public boolean release(int decrement) { - Preconditions.checkState(decrement >= 1, - "ref count decrement should be greater than or equal to 1"); - // decrement the ref count - final int refCnt = decrement(decrement); - if (BaseAllocator.DEBUG) { - historicalLog.recordEvent("release(%d). original value: %d", - decrement, refCnt + decrement); - } - // the new ref count should be >= 0 - Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); - return refCnt == 0; - } + protected abstract ReferenceManager newReferenceManager(); /** - * Decrement the ledger's reference count for the associated underlying - * memory chunk. If the reference count drops to 0, it implies that - * no ArrowBufs managed by this reference manager need access to the memory - * chunk. In that case, the ledger should inform the allocation manager - * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will - * decide since tracks the usage of memory chunk across multiple reference - * managers and allocators. + * Used by an allocator to create a new ArrowBuf. This is provided + * as a helper method for the allocator when it allocates a new memory chunk + * using a new instance of allocation manager and creates a new reference manager + * too. * - * @param decrement amount to decrease the reference count by - * @return the new reference count + * @param length The length in bytes that this ArrowBuf will provide access to. + * @param manager An optional BufferManager argument that can be used to manage expansion of + * this ArrowBuf + * @return A new ArrowBuf that shares references with all ArrowBufs associated + * with this BufferLedger */ - private int decrement(int decrement) { + ArrowBuf newArrowBuf(final long length, final BufferManager manager) { allocator.assertOpen(); - final int outcome; - synchronized (allocationManager) { - outcome = bufRefCnt.addAndGet(-decrement); - if (outcome == 0) { - lDestructionTime = System.nanoTime(); - // refcount of this reference manager has dropped to 0 - // inform the allocation manager that this reference manager - // no longer holds references to underlying memory - allocationManager.release(this); - } - } - return outcome; - } - /** - * Increment the ledger's reference count for associated - * underlying memory chunk by 1. - */ - @Override - public void retain() { - retain(1); - } + // the start virtual address of the ArrowBuf will be same as address of memory chunk + final long startAddress = allocationManager.memoryAddress(); - /** - * Increment the ledger's reference count for associated - * underlying memory chunk by the given amount. - * - * @param increment amount to increase the reference count by - */ - @Override - public void retain(int increment) { - Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment); + // create ArrowBuf + final ArrowBuf buf = new ArrowBuf(newReferenceManager(), manager, length, startAddress); + + // logging if (BaseAllocator.DEBUG) { - historicalLog.recordEvent("retain(%d)", increment); + logEvent( + "ArrowBuf(BufferLedger, BufferAllocator[%s], " + + "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", + allocator.getName(), System.identityHashCode(buf), buf.toString(), + System.identityHashCode(this)); + + synchronized (buffers) { + buffers.put(buf.createLogger(), null); + } } - final int originalReferenceCount = bufRefCnt.getAndAdd(increment); - Preconditions.checkArgument(originalReferenceCount > 0); + + return buf; } - /** - * Derive a new ArrowBuf from a given source ArrowBuf. The new derived - * ArrowBuf will share the same reference count as rest of the ArrowBufs - * associated with this ledger. This operation is typically used for - * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at - * a particular index in the underlying memory and having access to a - * particular length (in bytes) of data in memory chunk. - *

- * This method is also used as a helper for transferring ownership and retain to target - * allocator. - *

- * @param sourceBuffer source ArrowBuf - * @param index index (relative to source ArrowBuf) new ArrowBuf should be - * derived from - * @param length length (bytes) of data in underlying memory that derived buffer will - * have access to in underlying memory - * @return derived buffer - */ - @Override - public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) { + ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) { /* * Usage type 1 for deriveBuffer(): * Used for slicing where index represents a relative index in the source ArrowBuf @@ -226,7 +132,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt // create new ArrowBuf final ArrowBuf derivedBuf = new ArrowBuf( - this, + newReferenceManager(), null, length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf derivedBufferAddress // starting byte address in the underlying memory for this new ArrowBuf @@ -234,7 +140,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt // logging if (BaseAllocator.DEBUG) { - historicalLog.recordEvent( + logEvent( "ArrowBuf(BufferLedger, BufferAllocator[%s], " + "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", @@ -242,109 +148,32 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt System.identityHashCode(this)); synchronized (buffers) { - buffers.put(derivedBuf, null); + buffers.put(derivedBuf.createLogger(), null); } } return derivedBuf; } - /** - * Used by an allocator to create a new ArrowBuf. This is provided - * as a helper method for the allocator when it allocates a new memory chunk - * using a new instance of allocation manager and creates a new reference manager - * too. - * - * @param length The length in bytes that this ArrowBuf will provide access to. - * @param manager An optional BufferManager argument that can be used to manage expansion of - * this ArrowBuf - * @return A new ArrowBuf that shares references with all ArrowBufs associated - * with this BufferLedger - */ - ArrowBuf newArrowBuf(final long length, final BufferManager manager) { - allocator.assertOpen(); - - // the start virtual address of the ArrowBuf will be same as address of memory chunk - final long startAddress = allocationManager.memoryAddress(); - - // create ArrowBuf - final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress); - - // logging - if (BaseAllocator.DEBUG) { - historicalLog.recordEvent( - "ArrowBuf(BufferLedger, BufferAllocator[%s], " + - "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", - allocator.getName(), System.identityHashCode(buf), buf.toString(), - System.identityHashCode(this)); - - synchronized (buffers) { - buffers.put(buf, null); - } - } - - return buf; - } - - /** - * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of - * memory ownership and accounting. This has no impact on the reference counting for the current - * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer. - *

- * This operation has no impact on the reference count of this ArrowBuf. The newly created - * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this - * memory is being associated with the target allocator or in other words allocation manager currently - * doesn't hold a mapping for the target allocator) or the current value of the reference count for - * the target allocator-reference manager combination + 1 in the case that the provided allocator - * already had an association to this underlying memory. - *

- * - * @param srcBuffer source ArrowBuf - * @param target The target allocator to create an association with. - * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf. - */ - @Override - public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { - - if (BaseAllocator.DEBUG) { - historicalLog.recordEvent("retain(%s)", target.getName()); - } - - // the call to associate will return the corresponding reference manager (buffer ledger) for - // the target allocator. if the allocation manager didn't already have a mapping - // for the target allocator, it will create one and return the new reference manager with a - // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1. - // alternatively, if there was already a mapping for in - // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 - // and this will be true for all the existing buffers currently managed by targetrefmanager - final BufferLedger targetRefManager = allocationManager.associate(target); - // create a new ArrowBuf to associate with new allocator and target ref manager - final long targetBufLength = srcBuffer.capacity(); - ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); - targetArrowBuf.readerIndex(srcBuffer.readerIndex()); - targetArrowBuf.writerIndex(srcBuffer.writerIndex()); - return targetArrowBuf; - } - /** * Transfer any balance the current ledger has to the target ledger. In the case * that the current ledger holds no memory, no transfer is made to the new ledger. * - * @param targetReferenceManager The ledger to transfer ownership account to. + * @param targetLedger The ledger to transfer ownership account to. * @return Whether transfer fit within target ledgers limits. */ - boolean transferBalance(final ReferenceManager targetReferenceManager) { - Preconditions.checkArgument(targetReferenceManager != null, + boolean transferBalance(final BufferLedger targetLedger) { + Preconditions.checkArgument(targetLedger != null, "Expecting valid target reference manager"); - final BufferAllocator targetAllocator = targetReferenceManager.getAllocator(); + final BufferAllocator targetAllocator = targetLedger.getAllocator(); Preconditions.checkArgument(allocator.getRoot() == targetAllocator.getRoot(), "You can only transfer between two allocators that share the same root."); allocator.assertOpen(); - targetReferenceManager.getAllocator().assertOpen(); + targetLedger.getAllocator().assertOpen(); // if we're transferring to ourself, just return. - if (targetReferenceManager == this) { + if (targetLedger == this) { return true; } @@ -360,8 +189,8 @@ boolean transferBalance(final ReferenceManager targetReferenceManager) { } if (BaseAllocator.DEBUG) { - this.historicalLog.recordEvent("transferBalance(%s)", - targetReferenceManager.getAllocator().getName()); + logEvent("transferBalance(%s)", + targetLedger.getAllocator().getName()); } boolean overlimit = targetAllocator.forceAllocate(allocationManager.getSize()); @@ -369,90 +198,15 @@ boolean transferBalance(final ReferenceManager targetReferenceManager) { // since the transfer can only happen from the owning reference manager, // we need to set the target ref manager as the new owning ref manager // for the chunk of memory in allocation manager - allocationManager.setOwningLedger((BufferLedger) targetReferenceManager); + allocationManager.setOwningLedger(targetLedger); return overlimit; } } - /** - * Transfer the memory accounting ownership of this ArrowBuf to another allocator. - * This will generate a new ArrowBuf that carries an association with the underlying memory - * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory, - * that memory ownership/accounting will be transferred to the target allocator. If this - * ArrowBuf does not currently own the memory underlying it (and is only associated with it), - * this does not transfer any ownership to the newly created ArrowBuf. - *

- * This operation has no impact on the reference count of this ArrowBuf. The newly created - * ArrowBuf with either have a reference count of 1 (in the case that this is the first time - * this memory is being associated with the new allocator) or the current value of the reference - * count for the other AllocationManager/BufferLedger combination + 1 in the case that the provided - * allocator already had an association to this underlying memory. - *

- *

- * Transfers will always succeed, even if that puts the other allocator into an overlimit - * situation. This is possible due to the fact that the original owning allocator may have - * allocated this memory out of a local reservation whereas the target allocator may need to - * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless - * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely - * to an actual overlimit==true condition. This is simply conservative behavior which means we may - * return overlimit slightly sooner than is necessary. - *

- * - * @param target The allocator to transfer ownership to. - * @return A new transfer result with the impact of the transfer (whether it was overlimit) as - * well as the newly created ArrowBuf. - */ - @Override - public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) { - // the call to associate will return the corresponding reference manager (buffer ledger) for - // the target allocator. if the allocation manager didn't already have a mapping - // for the target allocator, it will create one and return the new reference manager with a - // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1. - // alternatively, if there was already a mapping for in - // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 - // and this will be true for all the existing buffers currently managed by targetrefmanager - final BufferLedger targetRefManager = allocationManager.associate(target); - // create a new ArrowBuf to associate with new allocator and target ref manager - final long targetBufLength = srcBuffer.capacity(); - final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); - targetArrowBuf.readerIndex(srcBuffer.readerIndex()); - targetArrowBuf.writerIndex(srcBuffer.writerIndex()); - final boolean allocationFit = transferBalance(targetRefManager); - return new TransferResult(allocationFit, targetArrowBuf); - } - - /** - * The outcome of a Transfer. - */ - public class TransferResult implements OwnershipTransferResult { - - // Whether this transfer fit within the target allocator's capacity. - final boolean allocationFit; - - // The newly created buffer associated with the target allocator - public final ArrowBuf buffer; - - private TransferResult(boolean allocationFit, ArrowBuf buffer) { - this.allocationFit = allocationFit; - this.buffer = buffer; - } - - @Override - public ArrowBuf getTransferredBuffer() { - return buffer; - } - - @Override - public boolean getAllocationFit() { - return allocationFit; - } - } - /** * Total size (in bytes) of memory underlying this reference manager. * @return Size (in bytes) of the memory chunk */ - @Override public long getSize() { return allocationManager.getSize(); } @@ -463,7 +217,6 @@ public long getSize() { * is not the owning ledger associated with this memory. * @return Amount of accounted(owned) memory associated with this ledger. */ - @Override public long getAccountedSize() { synchronized (allocationManager) { if (allocationManager.getOwningLedger() == this) { @@ -490,11 +243,11 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { .append("), isOwning: ") .append(", size: ") .append(", references: ") - .append(bufRefCnt.get()) + .append(getRefCount()) .append(", life: ") .append(lCreationTime) .append("..") - .append(lDestructionTime) + .append(getDestructionTime()) .append(", allocatorManager: [") .append(", life: "); @@ -505,8 +258,8 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { sb.append("] holds ") .append(buffers.size()) .append(" buffers. \n"); - for (ArrowBuf buf : buffers.keySet()) { - buf.print(sb, indent + 2, verbosity); + for (ArrowBuf.Logger bufLogger : buffers.keySet()) { + bufLogger.print(sb, indent + 2, verbosity); sb.append('\n'); } } @@ -518,8 +271,54 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { * * @return The AllocationManager used by this BufferLedger. */ - public AllocationManager getAllocationManager() { + AllocationManager getAllocationManager() { return allocationManager; } + /** + * Record a single log line into this ledger's historical log. + */ + protected void logEvent(final String noteFormat, Object... args) { + historicalLog.recordEvent(noteFormat, args); + } + + /** + * If this ledger is the owning ledger of the underlying allocation manager. + * + * @return true if this ledger owns its allocation manager + */ + boolean isOwningLedger() { + return this == allocationManager.getOwningLedger(); + } + + /** + * Get the buffer allocator associated with this reference manager. + * @return buffer allocator + */ + BufferAllocator getAllocator() { + return allocator; + } + + /** + * Get allocator key. Used by {@link LowCostIdentityHashMap}. + */ + public BufferAllocator getKey() { + return allocator; + } + + /** + * A factory interface for creating {@link BufferLedger}. + */ + public interface Factory { + /** + * Create an instance of {@link BufferLedger}. + * + * @param allocator The allocator that will bind to the newly created {@link BufferLedger}. + * @param allocationManager The {@link AllocationManager} that actually holds the underlying + * memory block. Note that the newly created {@link BufferLedger} will + * not be the one that actually owns this piece of memory by default. + * @return The created {@link BufferLedger}. + */ + BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager); + } } diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java new file mode 100644 index 0000000000000..0a4bd46e773b2 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import org.apache.arrow.memory.util.MemoryUtil; + +/** + * {@link AllocationListener} implementation to reserve bytes on JVM direct memory. + */ +public class DirectAllocationListener implements AllocationListener { + + public static final DirectAllocationListener INSTANCE = new DirectAllocationListener(); + + @Override + public void onPreAllocation(long size) { + MemoryUtil.reserveDirectMemory(size); + } + + @Override + public void onRelease(long size) { + MemoryUtil.unreserveDirectMemory(size); + } +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java new file mode 100644 index 0000000000000..76e69f9257076 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.util.Preconditions; + +/** + * Legacy implementation of {@link BufferLedger}. The reference count should be manually managed by + * explicitly invoking methods {@link #retain()} and {@link #release()}, etc. + */ +public class LegacyBufferLedger extends BufferLedger { + + public static final Factory FACTORY = new Factory() { + @Override + public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) { + return new LegacyBufferLedger(allocator, allocationManager); + } + }; + + private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can + private volatile long lDestructionTime = 0; + + LegacyBufferLedger(BufferAllocator allocator, AllocationManager allocationManager) { + super(allocator, allocationManager); + } + + /** + * Get this ledger's reference count. + * @return reference count + */ + @Override + public int getRefCount() { + return bufRefCnt.get(); + } + + + @Override + protected void increment() { + bufRefCnt.incrementAndGet(); + } + + @Override + protected long getDestructionTime() { + return lDestructionTime; + } + + @Override + protected ReferenceManager newReferenceManager() { + return new BaseReferenceManager(this); + } + + /** + * Decrement the ledger's reference count for the associated underlying + * memory chunk. If the reference count drops to 0, it implies that + * no ArrowBufs managed by this reference manager need access to the memory + * chunk. In that case, the ledger should inform the allocation manager + * about releasing its ownership for the chunk. Whether or not the memory + * chunk will be released is something that {@link AllocationManager} will + * decide since tracks the usage of memory chunk across multiple reference + * managers and allocators. + * + * @param decrement amount to decrease the reference count by + * @return the new reference count + */ + private int decrement(int decrement) { + getAllocator().assertOpen(); + final int outcome; + synchronized (getAllocationManager()) { + outcome = bufRefCnt.addAndGet(-decrement); + if (outcome == 0) { + lDestructionTime = System.nanoTime(); + // refcount of this reference manager has dropped to 0 + // inform the allocation manager that this reference manager + // no longer holds references to underlying memory + getAllocationManager().release(this); + } + } + return outcome; + } + + /** + * Decrement the ledger's reference count by 1 for the associated underlying + * memory chunk. If the reference count drops to 0, it implies that + * no ArrowBufs managed by this reference manager need access to the memory + * chunk. In that case, the ledger should inform the allocation manager + * about releasing its ownership for the chunk. Whether or not the memory + * chunk will be released is something that {@link AllocationManager} will + * decide since tracks the usage of memory chunk across multiple reference + * managers and allocators. + * @return true if the new ref count has dropped to 0, false otherwise + */ + @Override + public boolean release() { + return release(1); + } + + /** + * Decrement the ledger's reference count for the associated underlying + * memory chunk. If the reference count drops to 0, it implies that + * no ArrowBufs managed by this reference manager need access to the memory + * chunk. In that case, the ledger should inform the allocation manager + * about releasing its ownership for the chunk. Whether or not the memory + * chunk will be released is something that {@link AllocationManager} will + * decide since tracks the usage of memory chunk across multiple reference + * managers and allocators. + * @param decrement amount to decrease the reference count by + * @return true if the new ref count has dropped to 0, false otherwise + */ + @Override + public boolean release(int decrement) { + Preconditions.checkState(decrement >= 1, + "ref count decrement should be greater than or equal to 1"); + // decrement the ref count + final int refCnt = decrement(decrement); + if (BaseAllocator.DEBUG) { + logEvent("release(%d). original value: %d", + decrement, refCnt + decrement); + } + // the new ref count should be >= 0 + Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); + return refCnt == 0; + } + + /** + * Increment the ledger's reference count for associated + * underlying memory chunk by 1. + */ + @Override + public void retain() { + retain(1); + } + + /** + * Increment the ledger's reference count for associated + * underlying memory chunk by the given amount. + * + * @param increment amount to increase the reference count by + */ + @Override + public void retain(int increment) { + Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment); + if (BaseAllocator.DEBUG) { + logEvent("retain(%d)", increment); + } + final int originalReferenceCount = bufRefCnt.getAndAdd(increment); + Preconditions.checkArgument(originalReferenceCount > 0); + } + + + +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java new file mode 100644 index 0000000000000..67aec2a8444c4 --- /dev/null +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * Base interface for reference counted facilities. + */ +public interface ReferenceCountAware { + /** + * Get current reference count. + * + * @return current reference count + */ + int getRefCount(); + + /** + * Decrement reference count by 1. + * + * @return true if reference count has dropped to 0 + */ + boolean release(); + + /** + * Decrement reference count by specific amount of decrement. + * + * @param decrement the count to decrease the reference count by + * @return true if reference count has dropped to 0 + */ + boolean release(int decrement); + + /** + * Increment reference count by 1. + */ + void retain(); + + /** + * Increment reference count by specific amount of increment. + * + * @param increment the count to increase the reference count by + */ + void retain(int increment); +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java index 00ae274b744d7..812ef76bfb049 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java @@ -21,43 +21,7 @@ * Reference Manager manages one or more ArrowBufs that share the * reference count for the underlying memory chunk. */ -public interface ReferenceManager { - - /** - * Return the reference count. - * @return reference count - */ - int getRefCount(); - - /** - * Decrement this reference manager's reference count by 1 for the associated underlying - * memory. If the reference count drops to 0, it implies that ArrowBufs managed by this - * reference manager no longer need access to the underlying memory - * @return true if ref count has dropped to 0, false otherwise - */ - boolean release(); - - /** - * Decrement this reference manager's reference count for the associated underlying - * memory. If the reference count drops to 0, it implies that ArrowBufs managed by this - * reference manager no longer need access to the underlying memory - * @param decrement the count to decrease the reference count by - * @return the new reference count - */ - boolean release(int decrement); - - /** - * Increment this reference manager's reference count by 1 for the associated underlying - * memory. - */ - void retain(); - - /** - * Increment this reference manager's reference count by a given amount for the - * associated underlying memory. - * @param increment the count to increase the reference count by - */ - void retain(int increment); +public interface ReferenceManager extends ReferenceCountAware { /** * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 16ef39702ca3e..d87b116d653ee 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -20,10 +20,14 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.util.VisibleForTesting; import sun.misc.Unsafe; @@ -34,6 +38,10 @@ public class MemoryUtil { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryUtil.class); private static final Constructor DIRECT_BUFFER_CONSTRUCTOR; + private static final Method DIRECT_MEMORY_RESERVE; + private static final Method DIRECT_MEMORY_UNRESERVE; + private static final Field DIRECT_MEMORY_COUNTER; + /** * The unsafe object from which to access the off-heap memory. */ @@ -132,6 +140,48 @@ public Object run() { } } DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor; + + DIRECT_MEMORY_RESERVE = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Method run() { + try { + final Class classBits = Class.forName("java.nio.Bits"); + Method methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class); + methodReserve.setAccessible(true); + return methodReserve; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + DIRECT_MEMORY_UNRESERVE = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Method run() { + try { + final Class classBits = Class.forName("java.nio.Bits"); + Method methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class); + methodUnreserve.setAccessible(true); + return methodUnreserve; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + DIRECT_MEMORY_COUNTER = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Field run() { + try { + final Class classBits = Class.forName("java.nio.Bits"); + final Field f = classBits.getDeclaredField("reservedMemory"); + f.setAccessible(true); + return f; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); } catch (Throwable e) { throw new RuntimeException("Failed to initialize MemoryUtil.", e); } @@ -167,4 +217,48 @@ public static ByteBuffer directBuffer(long address, int capacity) { throw new UnsupportedOperationException( "sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available"); } + + /** + * Reserve bytes from JVM direct memory. Garbage collection will be triggered once + * the total reserved amount reaches the limit specified via JVM option "-XX:MaxDirectMemorySize". + * + * @param size size in bytes to reserve + */ + public static void reserveDirectMemory(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + DIRECT_MEMORY_RESERVE.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Unreserve bytes from JVM direct memory. + * + * @param size size in bytes to unreserve + */ + public static void unreserveDirectMemory(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + DIRECT_MEMORY_UNRESERVE.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get current reservation of jVM direct memory. Visible for testing. + */ + public static long getCurrentDirectMemReservation() { + try { + return ((AtomicLong) DIRECT_MEMORY_COUNTER.get(null)).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java new file mode 100644 index 0000000000000..619fec5029c7e --- /dev/null +++ b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.arrow.memory.util.MemoryUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import sun.misc.JavaLangRefAccess; +import sun.misc.SharedSecrets; + +public class TestAutoBufferLedger { + + private static final int MAX_ALLOCATION = Integer.MAX_VALUE; + private static RootAllocator root; + + @BeforeClass + public static void beforeClass() { + root = new RootAllocator( + BaseAllocator.configBuilder() + .maxAllocation(MAX_ALLOCATION) + .bufferLedgerFactory(AutoBufferLedger.newFactory()) + .listener(DirectAllocationListener.INSTANCE) + .build()); + cleanUpJvmReferences(); + } + + @Test + public void testBufferAllocation() { + final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator.buffer(2L); + assertEquals(2L, buf.capacity()); + assertEquals(2L, allocator.getAllocatedMemory()); + } + + @Test + public void testBufferDerivation() { + BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator.buffer(2); + assertEquals(2, buf.capacity()); + assertEquals(1, buf.slice(1, 1).capacity()); + assertEquals(2, buf.slice(0, 2).capacity()); + assertEquals(2L, allocator.getAllocatedMemory()); + } + + @Test + public void testBufferDeallocation() { + final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator.buffer(2L); + assertEquals(2L, buf.capacity()); + assertEquals(2L, allocator.getAllocatedMemory()); + + // AutoBufferLedger ignores all release operations here. + buf.getReferenceManager().release(); + assertEquals(2L, buf.capacity()); + } + + @Test + public void testDirectMemoryReservation() { + final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + long prevAlloc = MemoryUtil.getCurrentDirectMemReservation(); + allocator.buffer(2L); + long alloc = MemoryUtil.getCurrentDirectMemReservation(); + assertEquals(2L, alloc - prevAlloc); + } + + @Test + public void testManualGC() { + final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator.buffer(2L); + assertEquals(2L, allocator.getAllocatedMemory()); + buf = null; // make the buffer able to be discovered by garbage collector + cleanUpJvmReferences(); + assertEquals(0L, allocator.getAllocatedMemory()); + } + + @Test + public void testManualGCOnSharing() { + final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator.buffer(2L); + ArrowBuf sliced1 = buf.slice(1, 1); + ArrowBuf sliced2 = buf.slice(0, 2); + assertEquals(2L, allocator.getAllocatedMemory()); + buf = null; + cleanUpJvmReferences(); + assertEquals(2L, allocator.getAllocatedMemory()); + sliced1 = null; + cleanUpJvmReferences(); + assertEquals(2L, allocator.getAllocatedMemory()); + sliced2 = null; + cleanUpJvmReferences(); + assertEquals(0L, allocator.getAllocatedMemory()); + } + + @Test + public void testManualGCOnCrossAllocatorSharing() { + final BufferAllocator allocator1 = root.newChildAllocator("TEST-CHILD-1", 0, MAX_ALLOCATION); + final BufferAllocator allocator2 = root.newChildAllocator("TEST-CHILD-2", 0, MAX_ALLOCATION); + ArrowBuf buf = allocator1.buffer(2L); + ArrowBuf other = buf.getReferenceManager().retain(buf, allocator2); + assertEquals(2L, allocator1.getAllocatedMemory()); + assertEquals(0L, allocator2.getAllocatedMemory()); + buf = null; + cleanUpJvmReferences(); + assertEquals(0L, allocator1.getAllocatedMemory()); + assertEquals(2L, allocator2.getAllocatedMemory()); + other = null; + cleanUpJvmReferences(); + assertEquals(0L, allocator1.getAllocatedMemory()); + assertEquals(0L, allocator2.getAllocatedMemory()); + } + + @Test + public void testManualGCWithinDirectMemoryReservation() { + final BufferAllocator allocator1 = root.newChildAllocator("TEST-CHILD-1", 0, MAX_ALLOCATION); + final BufferAllocator allocator2 = root.newChildAllocator("TEST-CHILD-2", 0, MAX_ALLOCATION); + long prevAlloc = MemoryUtil.getCurrentDirectMemReservation(); + ArrowBuf buffer1 = allocator1.buffer(2L); + ArrowBuf buffer2 = buffer1.getReferenceManager().retain(buffer1, allocator2); + long alloc1 = MemoryUtil.getCurrentDirectMemReservation(); + assertEquals(2L, alloc1 - prevAlloc); + buffer1 = null; + cleanUpJvmReferences(); + long alloc2 = MemoryUtil.getCurrentDirectMemReservation(); + assertEquals(2L, alloc2 - prevAlloc); + buffer2 = null; + cleanUpJvmReferences(); + long alloc3 = MemoryUtil.getCurrentDirectMemReservation(); + assertEquals(prevAlloc, alloc3); + } + + @Test + public void testFactoryClose() { + final AutoBufferLedger.Factory factory = AutoBufferLedger.newFactory(); + final BufferAllocator alloc = new RootAllocator( + BaseAllocator.configBuilder() + .maxAllocation(MAX_ALLOCATION) + .bufferLedgerFactory(factory) + .build()); + ArrowBuf buf = alloc.buffer(2); + assertEquals(2, buf.capacity()); + assertEquals(1, buf.slice(1, 1).capacity()); + assertEquals(2, buf.slice(0, 2).capacity()); + assertEquals(2L, alloc.getAllocatedMemory()); + factory.close(); + assertEquals(0L, alloc.getAllocatedMemory()); + } + + private static void cleanUpJvmReferences() { + final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess(); + System.gc(); + long prev = System.nanoTime(); + long sleep = 1L; + while (true) { + long elapsed = System.nanoTime() - prev; + if (TimeUnit.NANOSECONDS.toMillis(elapsed) > 500L) { + break; + } + if (!jlra.tryHandlePendingReference()) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + sleep = sleep << 1; + } + } + } +}