From 76492c563e6533328b81958065add63b31f87dd0 Mon Sep 17 00:00:00 2001 From: Lev Tolmachev Date: Mon, 12 Feb 2024 14:27:44 +0000 Subject: [PATCH] GH-40039: [Java][FlightRPC] Improve performance by removing unnecessary memory copies (#40042) ### Rationale for this change Described in details in the issue: https://github.com/apache/arrow/issues/40039 Summary: class ArrowMessage uses CompositeByteBuf to avoid memory copies but `maxNumComponents` for it is calculated incorrectly and as a result memory copies are still performed which significantly affects the performance of the server. ### What changes are included in this PR? Changing maxNumComponents to `Integer.MAX_VALUE` because we never want to silently merge large buffers into one. User can set useZeroCopy=false (default) and then the library will copy data into a new buffer before sending it to Netty for write. ### Are these changes tested? **TestPerf: 30% throughput boost** ``` BEFORE Transferred 100000000 records totaling 3200000000 bytes at 877.812629 MiB/s. 28764164.218015 record/s. 7024.784185 batch/s. AFTER Transferred 100000000 records totaling 3200000000 bytes at 1145.333893 MiB/s. 37530301.022096 record/s. 9165.650116 batch/s. ``` Also tested with a simple client-server application and I saw even more significant performance boost if padding isn't needed. Two tests with zero-copy set to true: **50 batches, 30 columns (Int32), 199999 rows in each batch** - before change: throughput ~25Gbit/s (memory copy happens in `grpc-nio-worker-ELG-*`) - after change: throughput ~32Gbit/s (20% boost) **50 batches, 30 columns (Int32), 200k rows in each batch** - before change: throughput ~15Gbit/s (much slower than with 199999 because memory copy happens in `flight-server-default-executor-*` thread and blocks server from writing next batch. - after change: throughput ~32Gbit/s (**115% boost**) * Closes: #40039 Authored-by: Lev Tolmachev Signed-off-by: David Li --- .../org/apache/arrow/flight/ArrowMessage.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 46cb282e9f3ce..5b946932f39f2 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -429,11 +429,26 @@ private InputStream asInputStream() { ByteBuf initialBuf = Unpooled.buffer(baos.size()); initialBuf.writeBytes(baos.toByteArray()); final CompositeByteBuf bb; - final int maxNumComponents = Math.max(2, bufs.size() + 1); final ImmutableList byteBufs = ImmutableList.builder() .add(initialBuf) .addAll(allBufs) .build(); + // See: https://github.com/apache/arrow/issues/40039 + // CompositeByteBuf requires us to pass maxNumComponents to constructor. + // This number will be used to decide when to stop adding new components as separate buffers + // and instead merge existing components into a new buffer by performing a memory copy. + // We want to avoind memory copies as much as possible so we want to set the limit that won't be reached. + // At a first glance it seems reasonable to set limit to byteBufs.size() + 1, + // because it will be enough to avoid merges of byteBufs that we pass to constructor. + // But later this buffer will be written to socket by Netty + // and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to combine small buffers into one. + // Method CoalescingBufferQueue.compose will check if current buffer is already a CompositeByteBuf + // and if it's the case it will just add a new component to this buffer. + // But in out case if we set maxNumComponents=byteBufs.size() + 1 it will happen on the first attempt + // to write data to socket because header message is small and Netty will always try to compine it with the + // large CompositeByteBuf we're creating here. + // We never want additional memory copies so setting the limit to Integer.MAX_VALUE + final int maxNumComponents = Integer.MAX_VALUE; if (tryZeroCopyWrite) { bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs, bufs); } else {