Skip to content

Commit

Permalink
GH-40039: [Java][FlightRPC] Improve performance by removing unnecessa…
Browse files Browse the repository at this point in the history
…ry memory copies (#40042)

### Rationale for this change
Described in details in the issue: apache/arrow#40039

Summary: class ArrowMessage uses CompositeByteBuf to avoid memory copies but `maxNumComponents` for it is calculated incorrectly and as a result memory copies are still performed which significantly affects the performance of the server.

### What changes are included in this PR?
Changing maxNumComponents to `Integer.MAX_VALUE` because we never want to silently merge large buffers into one.

User can set useZeroCopy=false (default) and then the library will copy data into a new buffer before sending it to Netty for write. 

### Are these changes tested?

**TestPerf: 30% throughput boost**
```
BEFORE
Transferred 100000000 records totaling 3200000000 bytes at 877.812629 MiB/s. 28764164.218015 record/s. 7024.784185 batch/s.

AFTER
Transferred 100000000 records totaling 3200000000 bytes at 1145.333893 MiB/s. 37530301.022096 record/s. 9165.650116 batch/s.
```

Also tested with a simple client-server application and I saw even more significant performance boost if padding isn't needed.

Two tests with zero-copy set to true:
**50 batches, 30 columns (Int32), 199999 rows in each batch**
- before change: throughput ~25Gbit/s (memory copy happens in `grpc-nio-worker-ELG-*`)
- after change: throughput ~32Gbit/s (20% boost)

**50 batches, 30 columns (Int32), 200k rows in each batch**
- before change: throughput ~15Gbit/s (much slower than with 199999 because memory copy happens in `flight-server-default-executor-*` thread and blocks server from writing next batch.
- after change: throughput ~32Gbit/s (**115% boost**)
* Closes: #40039

Authored-by: Lev Tolmachev <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
tolmalev authored and kou committed Nov 25, 2024
1 parent f1a197c commit ef0071e
Showing 1 changed file with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,26 @@ private InputStream asInputStream() {
ByteBuf initialBuf = Unpooled.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb;
final int maxNumComponents = Math.max(2, bufs.size() + 1);
final ImmutableList<ByteBuf> byteBufs = ImmutableList.<ByteBuf>builder()
.add(initialBuf)
.addAll(allBufs)
.build();
// See: https://github.com/apache/arrow/issues/40039
// CompositeByteBuf requires us to pass maxNumComponents to constructor.
// This number will be used to decide when to stop adding new components as separate buffers
// and instead merge existing components into a new buffer by performing a memory copy.
// We want to avoind memory copies as much as possible so we want to set the limit that won't be reached.
// At a first glance it seems reasonable to set limit to byteBufs.size() + 1,
// because it will be enough to avoid merges of byteBufs that we pass to constructor.
// But later this buffer will be written to socket by Netty
// and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to combine small buffers into one.
// Method CoalescingBufferQueue.compose will check if current buffer is already a CompositeByteBuf
// and if it's the case it will just add a new component to this buffer.
// But in out case if we set maxNumComponents=byteBufs.size() + 1 it will happen on the first attempt
// to write data to socket because header message is small and Netty will always try to compine it with the
// large CompositeByteBuf we're creating here.
// We never want additional memory copies so setting the limit to Integer.MAX_VALUE
final int maxNumComponents = Integer.MAX_VALUE;
if (tryZeroCopyWrite) {
bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs, bufs);
} else {
Expand Down

0 comments on commit ef0071e

Please sign in to comment.