From 6bb9cfc1681a2d01ac66e1bdebdf497afce6b08c Mon Sep 17 00:00:00 2001 From: siddharth Date: Mon, 6 May 2019 11:10:51 -0700 Subject: [PATCH] Cleanup --- .../main/java/io/netty/buffer/ArrowBuf.java | 65 ++++++++++- .../org/apache/arrow/memory/BufferLedger.java | 103 ++++++++---------- 2 files changed, 103 insertions(+), 65 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index d5022a7cd172e..ae7d354b677eb 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -595,11 +595,23 @@ public void writeByte(int value) { ++writerIndex; } + /** + * Write the bytes from given byte array into this + * ArrowBuf starting at writerIndex. + * @param src src byte array + */ public void writeBytes(byte[] src) { Preconditions.checkArgument(src != null, "expecting valid src array"); writeBytes(src, 0, src.length); } + /** + * Write the bytes from given byte array starting at srcIndex + * into this ArrowBuf starting at writerIndex. + * @param src src byte array + * @param srcIndex index in the byte array where the copy will being from + * @param length length of data to copy + */ public void writeBytes(byte[] src, int srcIndex, int length) { ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); @@ -946,17 +958,27 @@ public void setBytes(int index, ArrowBuf src, int srcIndex, int length) { * @param src src ArrowBuf where the data will be copied from */ public void setBytes(int index, ArrowBuf src) { + // null check + Preconditions.checkArgument(src != null, "expecting valid ArrowBuf"); final int length = src.readableBytes(); // bound check for this ArrowBuf where the data will be copied into checkIndex(index, length); - // null check - Preconditions.checkArgument(src != null, "expecting valid ArrowBuf"); final long srcAddress = src.memoryAddress() + (long)src.readerIndex; final long dstAddress = addr(index); PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length); src.readerIndex(src.readerIndex + length); } + /** + * Copy a certain length of bytes from given InputStream + * into this ArrowBuf at the provided index. + * @param index index index (0 based relative to the portion of memory + * this ArrowBuf has access to) + * @param in src stream to copy from + * @param length length of data to copy + * @return number of bytes copied from stream into ArrowBuf + * @throws IOException on failing to read from stream + */ public int setBytes(int index, InputStream in, int length) throws IOException { Preconditions.checkArgument(in != null, "expecting valid input stream"); checkIndex(index, length); @@ -974,6 +996,15 @@ public int setBytes(int index, InputStream in, int length) throws IOException { return readBytes; } + /** + * Copy a certain length of bytes from this ArrowBuf at a given + * index into the given OutputStream. + * @param index index index (0 based relative to the portion of memory + * this ArrowBuf has access to) + * @param out dst stream to copy data into + * @param length length of data to copy + * @throws IOException on failing to write to stream + */ public void getBytes(int index, OutputStream out, int length) throws IOException { Preconditions.checkArgument(out != null, "expecting valid output stream"); checkIndex(index, length); @@ -1039,7 +1070,6 @@ public String toHexString(final int start, final int length) { /** * Get the integer id assigned to this ArrowBuf for debugging purposes. - * * @return integer id */ public long getId() { @@ -1063,24 +1093,50 @@ public void print(StringBuilder sb, int indent, Verbosity verbosity) { } } + /** + * Get the index at which the next byte will be read from. + * @return reader index + */ public int readerIndex() { return readerIndex; } + /** + * Get the index at which next byte will be written to. + * @return writer index + */ public int writerIndex() { return writerIndex; } + /** + * Set the reader index for this ArrowBuf. + * @param readerIndex new reader index + * @return this ArrowBuf + */ public ArrowBuf readerIndex(int readerIndex) { this.readerIndex = readerIndex; return this; } + /** + * Set the writer index for this ArrowBuf. + * @param writerIndex new writer index + * @return this ArrowBuf + */ public ArrowBuf writerIndex(int writerIndex) { this.writerIndex = writerIndex; return this; } + /** + * Zero-out the bytes in this ArrowBuf starting at + * the given index for the given length. + * @param index index index (0 based relative to the portion of memory + * this ArrowBuf has access to) + * @param length length of bytes to zero-out + * @return this ArrowBuf + */ public ArrowBuf setZero(int index, int length) { if (length == 0) { return this; @@ -1088,13 +1144,11 @@ public ArrowBuf setZero(int index, int length) { this.checkIndex(index, length); int nLong = length >>> 3; int nBytes = length & 7; - int i; for (i = nLong; i > 0; --i) { setLong(index, 0L); index += 8; } - if (nBytes == 4) { setInt(index, 0); } else if (nBytes < 4) { @@ -1110,7 +1164,6 @@ public ArrowBuf setZero(int index, int length) { ++index; } } - return this; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 3c8a77a20c332..346c0d4a53769 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -36,16 +36,8 @@ * fate (same reference count). */ public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager { - // TODO: check this. - // since ArrowBuf interface has been cleaned to just include the length - // of memory chunk it has access to and starting virtual address in the chunk, - // ArrowBuf no longer tracks its offset within the chunk and that info - // is kept by the ReferenceManager/BufferLedger here. thus we need this map - // to track offsets of ArrowBufs managed by this ledger. the downside is - // that there could be potential increase in heap overhead as earlier - // map was created in debug mode only but now it will always be created - // per instance creation of BufferLedger - private final IdentityHashMap buffers = new IdentityHashMap<>(); + private final IdentityHashMap buffers = + BaseAllocator.DEBUG ? new IdentityHashMap<>() : null; private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); // unique ID assigned to each ledger private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); @@ -206,9 +198,7 @@ public void retain(int increment) { * particular length (in bytes) of data in memory chunk. *

* This method is also used as a helper for transferring ownership and retain to target - * allocator and in that case the source ArrowBuf is not associated with the reference - * manager on which we invoke this method so callers can pass null for sourceBuffer - * in such cases. + * allocator. *

* @param sourceBuffer source ArrowBuf * @param index index (relative to source ArrowBuf) new ArrowBuf should be @@ -219,46 +209,44 @@ public void retain(int increment) { */ @Override public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, int index, int length) { - ArrowBuf derivedBuf; - synchronized (buffers) { - // compute the start virtual address in the underlying memory chunk from which the new buffer - // will have access to - final int derivedBufferOffset; - if (sourceBuffer != null) { - // used for slicing where index represents a relative index in the source ArrowBuf - // as the slice start point and that is why we need to add the source buffer offset - // to compute the start virtual address of derived buffer within the underlying chunk - final int srcBufferOffset = buffers.get(sourceBuffer); - derivedBufferOffset = srcBufferOffset + index; - } else { - // used for retain(target allocator) and transferOwnership(target allocator) - // currently where index represents the source buffer offset. these operations - // simply create a new ArrowBuf associated with another combination of allocator - // buffer ledger for the same underlying memory - derivedBufferOffset = index; - } - - final long startAddress = allocationManager.getMemoryChunk().memoryAddress() + derivedBufferOffset; - - // create new ArrowBuf - derivedBuf = new ArrowBuf( - this, - null, - length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf - startAddress, // starting byte address in the underlying memory for this new ArrowBuf - false); + /* + * Usage type 1 for deriveBuffer(): + * Used for slicing where index represents a relative index in the source ArrowBuf + * as the slice start point. This is why we need to add the source buffer offset + * to compute the start virtual address of derived buffer within the + * underlying chunk. + * + * Usage type 2 for deriveBuffer(): + * Used for retain(target allocator) and transferOwnership(target allocator) + * where index is 0 since these operations simply create a new ArrowBuf associated + * with another combination of allocator buffer ledger for the same underlying memory + */ + + // the memory address stored inside ArrowBuf is its starting virtual + // address in the underlying memory chunk from the point it has + // access. so it is already accounting for the offset of the source buffer + // we can simply add the index to get the starting address of new buffer. + final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; + + // create new ArrowBuf + final ArrowBuf derivedBuf = new ArrowBuf( + this, + null, + length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf + derivedBufferAddress, // starting byte address in the underlying memory for this new ArrowBuf, + false); - // store the offset of new buffer - buffers.put(derivedBuf, derivedBufferOffset); - - // logging - if (BaseAllocator.DEBUG) { - historicalLog.recordEvent( - "ArrowBuf(BufferLedger, BufferAllocator[%s], " + - "UnsafeDirectLittleEndian[identityHashCode == " + - "%d](%s)) => ledger hc == %d", + // logging + if (BaseAllocator.DEBUG) { + historicalLog.recordEvent( + "ArrowBuf(BufferLedger, BufferAllocator[%s], " + + "UnsafeDirectLittleEndian[identityHashCode == " + + "%d](%s)) => ledger hc == %d", allocator.name, System.identityHashCode(derivedBuf), derivedBuf.toString(), System.identityHashCode(this)); + + synchronized (buffers) { + buffers.put(derivedBuf, null); } } @@ -286,11 +274,6 @@ ArrowBuf newArrowBuf(final int length, final BufferManager manager) { // create ArrowBuf final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress, false); - // store the offset (within the underlying memory chunk) of new buffer - synchronized (buffers) { - buffers.put(buf, 0); - } - // logging if (BaseAllocator.DEBUG) { historicalLog.recordEvent( @@ -298,6 +281,10 @@ ArrowBuf newArrowBuf(final int length, final BufferManager manager) { "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", allocator.name, System.identityHashCode(buf), buf.toString(), System.identityHashCode(this)); + + synchronized (buffers) { + buffers.put(buf, null); + } } return buf; @@ -339,9 +326,8 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { // and this will be true for all the existing buffers currently managed by targetrefmanager final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator)target); // create a new ArrowBuf to associate with new allocator and target ref manager - final int targetBufOffset = buffers.get(srcBuffer); final int targetBufLength = srcBuffer.capacity(); - ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(null, targetBufOffset, targetBufLength); + ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); targetArrowBuf.readerIndex(srcBuffer.readerIndex()); targetArrowBuf.writerIndex(srcBuffer.writerIndex()); return targetArrowBuf; @@ -437,9 +423,8 @@ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAl // and this will be true for all the existing buffers currently managed by targetrefmanager final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator)target); // create a new ArrowBuf to associate with new allocator and target ref manager - final int targetBufOffset = buffers.get(srcBuffer); final int targetBufLength = srcBuffer.capacity(); - final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(null, targetBufOffset, targetBufLength); + final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); targetArrowBuf.readerIndex(srcBuffer.readerIndex()); targetArrowBuf.writerIndex(srcBuffer.writerIndex()); final boolean allocationFit = transferBalance(targetRefManager);