Skip to content

Commit

Permalink
ARROW-3191: [Java] Make ArrowBuf work with arbitrary underlying memory
Browse files Browse the repository at this point in the history
This patch has the following goals:

(1) Make ArrowBuf work with any arbitrary memory.
(2) Decouple the usage of data get/set in ArrowBuf and memory accounting, reference management, ownership etc.

Changes

(1) A ReferenceManager interface that can be provided to ArrowBuf. This allows the users to provide their own custom implementation of reference management or it can be a NO-OP.
(2) All the accounting, ownership, reference related APIs have been moved to the default implementation of ReferenceManager -- BufferLedger, AllocationManager
(3) ArrowBuf is now literally an abstraction over some user provided underlying memory chunk. All it needs is starting virtual address and length of data to access along with a user provided implementation of ReferenceManager.
(4) ArrowBuf no longer extends or implements any of Netty's buffer interfaces. Thus all of the extra and unused APIs have been removed and it just provides simple get/set.

There is quite a bit of cleanup that needs to be done since some APIs have been moved out of ArrowBuf. So the caller code needs to change. They are likely going to be boilerplate changes but I would like to do them once we have consensus on the major set of changes here and the decoupling between ArrowBuf usage and reference management.

So the code doesn't compile yet because of the above mentioned reason. Secondly, there are a few things that I have removed assuming they are not being used  -- like BufferManager in ArrowBuf. I am still evaluating its usage. So there a few TODOs in code for these reasons.

Raising PR before the code is complete to get feedback on the important set of changes.

Author: siddharth <[email protected]>

Closes apache#4151 from siddharthteotia/ARROW-3191 and squashes the following commits:

6bb9cfc <siddharth> Cleanup
2fa139c <siddharth> integration test issues
283916f <siddharth> Fix integration test issues
47a303b <siddharth> Setting io.netty.tryReflectionSetAccessible to true
76096e5 <siddharth> Refactor NettyArrowBuf
0b9d5d2 <siddharth> Fix test failures happening in non-debug mode and gandiva build errors
2fbb2c5 <siddharth> Fix some test failures and rebase
9e8beb6 <siddharth> Fix test failures, add javadoc and wrapper over ArrowBuf for usage in Netty framework
348be8b <siddharth> Change callers of ArrowBuf APIs to use ReferenceManager interface and fix build issues
68fe274 <siddharth> ARROW-3191: WIP for pointing ArrowBuf to arbitrary memory
  • Loading branch information
siddharthteotia authored and BryanCutler committed May 7, 2019
1 parent 0a5f90a commit 34b9bb2
Show file tree
Hide file tree
Showing 35 changed files with 2,411 additions and 1,289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
case BODY_TAG:
if (body != null) {
// only read last body.
body.release();
body.getReferenceManager().release();
body = null;
}
int size = readRawVarint32(stream);
Expand Down Expand Up @@ -271,7 +271,7 @@ private InputStream asInputStream(BufferAllocator allocator) {
int size = 0;
List<ByteBuf> allBufs = new ArrayList<>();
for (ArrowBuf b : bufs) {
allBufs.add(b);
allBufs.add(b.asNettyBuffer());
size += b.readableBytes();
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++.
if (b.readableBytes() % 8 != 0) {
Expand All @@ -288,7 +288,7 @@ private InputStream asInputStream(BufferAllocator allocator) {
ArrowBuf initialBuf = allocator.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, bufs.size() + 1,
ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(allBufs).build());
ImmutableList.<ByteBuf>builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build());
final ByteBufInputStream is = new DrainableByteBufInputStream(bb);
return is;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void releaseRecordBatch(ArrowRecordBatch recordBatch) {
List<ArrowBuf> buffers = recordBatch.getBuffers();
recordBatch.close();
for (ArrowBuf buf : buffers) {
buf.release();
buf.getReferenceManager().release();
}
}

Expand Down
Loading

0 comments on commit 34b9bb2

Please sign in to comment.