From 758b3dd3cc4f6dfc2dfc12c3a77472d97c31c5d5 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 16 Jul 2024 16:19:30 -0400 Subject: [PATCH] fix: use fast calculation for totalRemaining number of bytes from multiple ByteBuffers (#2633) --- .../storage/GapicUnbufferedReadableByteChannel.java | 4 +--- .../com/google/cloud/storage/RewindableContent.java | 3 +-- .../java/com/google/cloud/storage/ThroughputSink.java | 10 ++++------ 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 6cecc8dded..ecaa3bc878 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -30,11 +30,9 @@ import com.google.storage.v2.ReadObjectResponse; import java.io.Closeable; import java.io.IOException; -import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.ScatteringByteChannel; -import java.util.Arrays; import java.util.Iterator; final class GapicUnbufferedReadableByteChannel @@ -80,7 +78,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { throw new ClosedChannelException(); } - long totalBufferCapacity = Arrays.stream(dsts).mapToLong(Buffer::remaining).sum(); + long totalBufferCapacity = Buffers.totalRemaining(dsts, offset, length); ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity); while (c.hasRemaining()) { if (leftovers != null) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java index 03d9cc4627..ef215573fb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java @@ -22,7 +22,6 @@ import com.google.common.io.ByteStreams; import java.io.IOException; import java.io.OutputStream; -import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.GatheringByteChannel; @@ -173,7 +172,7 @@ private static final class ByteBufferContent extends RewindableContent { private ByteBufferContent(ByteBuffer[] buffers) { this.buffers = buffers; this.positions = Arrays.stream(buffers).mapToInt(Buffers::position).toArray(); - this.totalLength = Arrays.stream(buffers).mapToLong(Buffer::remaining).sum(); + this.totalLength = Buffers.totalRemaining(buffers, 0, buffers.length); this.dirty = false; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java index 7d6d909dd6..4356df936d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java @@ -18,14 +18,12 @@ import com.google.common.base.MoreObjects; import java.io.IOException; -import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Arrays; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; @@ -262,7 +260,7 @@ public int write(ByteBuffer src) throws IOException { @Override public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { boolean exception = false; - long available = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + long available = Buffers.totalRemaining(srcs, offset, length); Instant begin = clock.instant(); try { return delegate.write(srcs, offset, length); @@ -271,7 +269,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException throw e; } finally { Instant end = clock.instant(); - long remaining = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + long remaining = Buffers.totalRemaining(srcs, offset, length); Record record = Record.of(available - remaining, begin, end, exception); sink.recordThroughput(record); } @@ -280,7 +278,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException @Override public long write(ByteBuffer[] srcs) throws IOException { boolean exception = false; - long available = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + long available = Buffers.totalRemaining(srcs, 0, srcs.length); Instant begin = clock.instant(); try { return delegate.write(srcs); @@ -289,7 +287,7 @@ public long write(ByteBuffer[] srcs) throws IOException { throw e; } finally { Instant end = clock.instant(); - long remaining = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + long remaining = Buffers.totalRemaining(srcs, 0, srcs.length); Record record = Record.of(available - remaining, begin, end, exception); sink.recordThroughput(record); }