From 13e515399c9aadc70bc6dd38a07c84c3964c3d47 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 17 May 2024 13:42:17 -0400 Subject: [PATCH 1/2] fix: reduce Java 21 Virtual Thread Pinning in IO operations --- .../cloud/storage/AsyncAppendingQueue.java | 99 +++++----- .../cloud/storage/BaseStorageReadChannel.java | 122 +++++++----- .../storage/BaseStorageWriteChannel.java | 70 ++++--- .../cloud/storage/BlobReadChannelV2.java | 13 +- .../cloud/storage/BlobWriteChannelV2.java | 46 +++-- ...CompositeUploadBlobWriteSessionConfig.java | 3 +- ...lelCompositeUploadWritableByteChannel.java | 8 +- .../cloud/storage/StorageByteChannels.java | 182 ++++++++++++++---- .../google/cloud/storage/ThroughputSink.java | 12 +- 9 files changed, 372 insertions(+), 183 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java index 644dd01415..3a9e01c882 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.checkerframework.checker.nullness.qual.NonNull; @@ -66,6 +67,7 @@ boolean isOpen() { private final AtomicReference shortCircuitFailure; private final ApiFutureCallback shortCircuitRegistrationCallback; + private final ReentrantLock lock; private volatile State state; private AsyncAppendingQueue( @@ -85,26 +87,32 @@ private AsyncAppendingQueue( shortCircuitFailure.compareAndSet(null, throwable); } }; + lock = new ReentrantLock(); } synchronized AsyncAppendingQueue append(ApiFuture value) throws ShortCircuitException { - checkState(state.isOpen(), "already closed"); - Throwable throwable = shortCircuitFailure.get(); - if (throwable != null) { - ShortCircuitException shortCircuitException = new ShortCircuitException(throwable); - finalResult.cancel(true); - throw shortCircuitException; + lock.lock(); + try { + checkState(state.isOpen(), "already closed"); + Throwable throwable = shortCircuitFailure.get(); + if (throwable != null) { + ShortCircuitException shortCircuitException = new ShortCircuitException(throwable); + finalResult.cancel(true); + throw shortCircuitException; + } + checkNotNull(value, "value must not be null"); + + Element newElement = newElement(value); + queue.offer(newElement); + boolean isFull = queue.size() == maxElementsPerCompact; + if (isFull) { + Element compact = compact(exec); + queue.offer(compact); + } + return this; + } finally { + lock.unlock(); } - checkNotNull(value, "value must not be null"); - - Element newElement = newElement(value); - queue.offer(newElement); - boolean isFull = queue.size() == maxElementsPerCompact; - if (isFull) { - Element compact = compact(exec); - queue.offer(compact); - } - return this; } ApiFuture getResult() { @@ -117,34 +125,39 @@ T await() { @Override public synchronized void close() { - if (!state.isOpen()) { - return; - } - state = State.CLOSING; - - if (queue.isEmpty()) { - NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to"); - finalResult.setException(neverAppendedTo); - throw neverAppendedTo; - } else { - Element transform = compact(exec); - - ApiFutures.addCallback( - transform.getValue(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable err) { - finalResult.setException(err); - } - - @Override - public void onSuccess(T t) { - finalResult.set(t); - } - }, - exec); + lock.lock(); + try { + if (!state.isOpen()) { + return; + } + state = State.CLOSING; + + if (queue.isEmpty()) { + NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to"); + finalResult.setException(neverAppendedTo); + throw neverAppendedTo; + } else { + Element transform = compact(exec); + + ApiFutures.addCallback( + transform.getValue(), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable err) { + finalResult.setException(err); + } + + @Override + public void onSuccess(T t) { + finalResult.set(t); + } + }, + exec); + } + state = State.CLOSED; + } finally { + lock.unlock(); } - state = State.CLOSED; } @NonNull diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java index b6b0a76b70..465d3d6ae2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.locks.ReentrantLock; import org.checkerframework.checker.nullness.qual.Nullable; abstract class BaseStorageReadChannel implements StorageReadChannel { @@ -41,42 +42,64 @@ abstract class BaseStorageReadChannel implements StorageReadChannel { private int chunkSize = _2MiB; private BufferHandle bufferHandle; private LazyReadChannel lazyReadChannel; + protected final ReentrantLock lock; protected BaseStorageReadChannel(Decoder objectDecoder) { this.objectDecoder = objectDecoder; this.result = SettableApiFuture.create(); this.open = true; this.byteRangeSpec = ByteRangeSpec.nullRange(); + this.lock = new ReentrantLock(); } @Override - public final synchronized void setChunkSize(int chunkSize) { - StorageException.wrapIOException(() -> maybeResetChannel(true)); - this.chunkSize = chunkSize; + public final void setChunkSize(int chunkSize) { + lock.lock(); + try { + StorageException.wrapIOException(() -> maybeResetChannel(true)); + this.chunkSize = chunkSize; + } finally { + lock.unlock(); + } } @Override - public final synchronized boolean isOpen() { - return open; + public final boolean isOpen() { + lock.lock(); + try { + return open; + } finally { + lock.unlock(); + } } @Override - public final synchronized void close() { - open = false; - if (internalGetLazyChannel().isOpen()) { - ReadableByteChannel channel = internalGetLazyChannel().getChannel(); - StorageException.wrapIOException(channel::close); + public final void close() { + lock.lock(); + try { + open = false; + if (internalGetLazyChannel().isOpen()) { + ReadableByteChannel channel = internalGetLazyChannel().getChannel(); + StorageException.wrapIOException(channel::close); + } + } finally { + lock.unlock(); } } @Override - public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) { + public final StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) { requireNonNull(byteRangeSpec, "byteRangeSpec must be non null"); - if (!this.byteRangeSpec.equals(byteRangeSpec)) { - StorageException.wrapIOException(() -> maybeResetChannel(false)); - this.byteRangeSpec = byteRangeSpec; + lock.lock(); + try { + if (!this.byteRangeSpec.equals(byteRangeSpec)) { + StorageException.wrapIOException(() -> maybeResetChannel(false)); + this.byteRangeSpec = byteRangeSpec; + } + return this; + } finally { + lock.unlock(); } - return this; } @Override @@ -85,42 +108,47 @@ public final ByteRangeSpec getByteRangeSpec() { } @Override - public final synchronized int read(ByteBuffer dst) throws IOException { - // BlobReadChannel only considered itself closed if close had been called on it. - if (!open) { - throw new ClosedChannelException(); - } - long diff = byteRangeSpec.length(); - // the check on beginOffset >= 0 used to be a precondition on seek(long) - // move it here to preserve existing behavior while allowing new negative offsets - if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) { - return -1; - } + public final int read(ByteBuffer dst) throws IOException { + lock.lock(); try { - // trap if the fact that tmp is already closed, and instead return -1 - ReadableByteChannel tmp = internalGetLazyChannel().getChannel(); - if (!tmp.isOpen()) { - return -1; - } - int read = tmp.read(dst); - if (read != -1) { - byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read); + // BlobReadChannel only considered itself closed if close had been called on it. + if (!open) { + throw new ClosedChannelException(); } - return read; - } catch (StorageException e) { - if (e.getCode() == 416) { - // HttpStorageRpc turns 416 into a null etag with an empty byte array, leading - // BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open. - // Emulate that same behavior here to preserve behavior compatibility, though this should - // be removed in the next major version. + long diff = byteRangeSpec.length(); + // the check on beginOffset >= 0 used to be a precondition on seek(long) + // move it here to preserve existing behavior while allowing new negative offsets + if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) { return -1; - } else { - throw new IOException(e); } - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(StorageException.coalesce(e)); + try { + // trap if the fact that tmp is already closed, and instead return -1 + ReadableByteChannel tmp = internalGetLazyChannel().getChannel(); + if (!tmp.isOpen()) { + return -1; + } + int read = tmp.read(dst); + if (read != -1) { + byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read); + } + return read; + } catch (StorageException e) { + if (e.getCode() == 416) { + // HttpStorageRpc turns 416 into a null etag with an empty byte array, leading + // BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open. + // Emulate that same behavior here to preserve behavior compatibility, though this should + // be removed in the next major version. + return -1; + } else { + throw new IOException(e); + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(StorageException.coalesce(e)); + } + } finally { + lock.unlock(); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java index f5f076327c..10f79c8dff 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java @@ -30,12 +30,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.locks.ReentrantLock; import org.checkerframework.checker.nullness.qual.Nullable; abstract class BaseStorageWriteChannel implements StorageWriteChannel { private final Decoder objectDecoder; private final SettableApiFuture result; + protected final ReentrantLock lock; private long position; private boolean open; @@ -55,27 +57,39 @@ abstract class BaseStorageWriteChannel implements StorageWriteChannel { protected BaseStorageWriteChannel(Decoder objectDecoder) { this.objectDecoder = objectDecoder; this.result = SettableApiFuture.create(); + this.lock = new ReentrantLock(); this.open = true; this.chunkSize = _16MiB; this.writeCalledAtLeastOnce = false; } @Override - public final synchronized void setChunkSize(int chunkSize) { - Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0, received %d", chunkSize); - Preconditions.checkState( - bufferHandle == null || bufferHandle.position() == 0, - "unable to change chunk size with data buffered"); - this.chunkSize = chunkSize; + public final void setChunkSize(int chunkSize) { + lock.lock(); + try { + Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0, received %d", chunkSize); + Preconditions.checkState( + bufferHandle == null || bufferHandle.position() == 0, + "unable to change chunk size with data buffered"); + this.chunkSize = chunkSize; + } finally { + lock.unlock(); + } } @Override - public final synchronized boolean isOpen() { - return open; + public final boolean isOpen() { + lock.lock(); + try { + return open; + } finally { + lock.unlock(); + } } @Override - public final synchronized void close() throws IOException { + public final void close() throws IOException { + lock.lock(); try { if (open && !writeCalledAtLeastOnce) { this.write(ByteBuffer.allocate(0)); @@ -85,28 +99,34 @@ public final synchronized void close() throws IOException { } } finally { open = false; + lock.unlock(); } } @Override - public final synchronized int write(ByteBuffer src) throws IOException { - if (!open) { - throw new ClosedChannelException(); - } - writeCalledAtLeastOnce = true; + public final int write(ByteBuffer src) throws IOException { + lock.lock(); try { - BufferedWritableByteChannel tmp = internalGetLazyChannel().getChannel(); - if (!tmp.isOpen()) { - return 0; + if (!open) { + throw new ClosedChannelException(); } - int write = tmp.write(src); - return write; - } catch (StorageException e) { - throw new IOException(e); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(StorageException.coalesce(e)); + writeCalledAtLeastOnce = true; + try { + BufferedWritableByteChannel tmp = internalGetLazyChannel().getChannel(); + if (!tmp.isOpen()) { + return 0; + } + int write = tmp.write(src); + return write; + } catch (StorageException e) { + throw new IOException(e); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(StorageException.coalesce(e)); + } + } finally { + lock.unlock(); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java index e814387171..6ff9ca6ec8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java @@ -51,10 +51,15 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel { } @Override - public synchronized RestorableState capture() { - ApiaryReadRequest apiaryReadRequest = getApiaryReadRequest(); - return new BlobReadChannelV2State( - apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize()); + public RestorableState capture() { + lock.lock(); + try { + ApiaryReadRequest apiaryReadRequest = getApiaryReadRequest(); + return new BlobReadChannelV2State( + apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize()); + } finally { + lock.unlock(); + } } protected LazyReadChannel newLazyReadChannel() { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java index 8b9de3f616..37108c4de2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java @@ -38,27 +38,32 @@ final class BlobWriteChannelV2 extends BaseStorageWriteChannel { } @Override - public synchronized RestorableState capture() { - final byte[] bufferSnapshot; - BufferHandle handle = getBufferHandle(); - if (handle.position() > 0) { - ByteBuffer byteBuffer = handle.get(); - // duplicate so we don't actually modify the existing instance - ByteBuffer dup = byteBuffer.duplicate(); - dup.flip(); - int remaining = dup.remaining(); - bufferSnapshot = new byte[remaining]; - dup.get(bufferSnapshot); - } else { - bufferSnapshot = new byte[0]; + public RestorableState capture() { + lock.lock(); + try { + final byte[] bufferSnapshot; + BufferHandle handle = getBufferHandle(); + if (handle.position() > 0) { + ByteBuffer byteBuffer = handle.get(); + // duplicate so we don't actually modify the existing instance + ByteBuffer dup = byteBuffer.duplicate(); + dup.flip(); + int remaining = dup.remaining(); + bufferSnapshot = new byte[remaining]; + dup.get(bufferSnapshot); + } else { + bufferSnapshot = new byte[0]; + } + return new BlobWriteChannelV2State( + blobChannelContext.getStorageOptions(), + start, + getCommittedPosition(), + isOpen(), + getChunkSize(), + bufferSnapshot); + } finally { + lock.unlock(); } - return new BlobWriteChannelV2State( - blobChannelContext.getStorageOptions(), - start, - getCommittedPosition(), - isOpen(), - getChunkSize(), - bufferSnapshot); } @Override @@ -80,6 +85,7 @@ protected LazyWriteChannel newLazyWriteChannel() { static final class BlobWriteChannelV2State implements RestorableState, Serializable { + private static final long serialVersionUID = -1901664719924133474L; private final HttpStorageOptions options; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java index c37f3e1abb..33ff739969 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java @@ -1052,7 +1052,8 @@ public ApiFuture openAsync() { storageInternal, info, opts); - return ApiFutures.immediateFuture(channel); + return ApiFutures.immediateFuture( + StorageByteChannels.writable().createSynchronized(channel)); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index a0dce2cfe4..58e9bad03d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -166,7 +166,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab } @Override - public synchronized int write(ByteBuffer src) throws IOException { + public int write(ByteBuffer src) throws IOException { if (!open) { throw new ClosedChannelException(); } @@ -190,12 +190,12 @@ public synchronized int write(ByteBuffer src) throws IOException { } @Override - public synchronized boolean isOpen() { + public boolean isOpen() { return open; } @Override - public synchronized void flush() throws IOException { + public void flush() throws IOException { if (current != null) { ByteBuffer buf = current.getBufferHandle().get(); internalFlush(buf); @@ -203,7 +203,7 @@ public synchronized void flush() throws IOException { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { if (!open) { return; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java index fb14d2e808..7f02c749c5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java @@ -25,6 +25,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; +import java.util.concurrent.locks.ReentrantLock; final class StorageByteChannels { @@ -74,24 +75,41 @@ private static final class SynchronizedBufferedReadableByteChannel implements BufferedReadableByteChannel { private final BufferedReadableByteChannel delegate; + private final ReentrantLock lock; public SynchronizedBufferedReadableByteChannel(BufferedReadableByteChannel delegate) { this.delegate = delegate; + this.lock = new ReentrantLock(); } @Override - public synchronized int read(ByteBuffer dst) throws IOException { - return delegate.read(dst); + public int read(ByteBuffer dst) throws IOException { + lock.lock(); + try { + return delegate.read(dst); + } finally { + lock.unlock(); + } } @Override public boolean isOpen() { - return delegate.isOpen(); + lock.lock(); + try { + return delegate.isOpen(); + } finally { + lock.unlock(); + } } @Override - public synchronized void close() throws IOException { - delegate.close(); + public void close() throws IOException { + lock.lock(); + try { + delegate.close(); + } finally { + lock.unlock(); + } } } @@ -99,29 +117,51 @@ private static final class SynchronizedBufferedWritableByteChannel implements BufferedWritableByteChannel { private final BufferedWritableByteChannel delegate; + private final ReentrantLock lock; public SynchronizedBufferedWritableByteChannel(BufferedWritableByteChannel delegate) { this.delegate = delegate; + this.lock = new ReentrantLock(); } @Override - public synchronized int write(ByteBuffer src) throws IOException { - return delegate.write(src); + public int write(ByteBuffer src) throws IOException { + lock.lock(); + try { + return delegate.write(src); + } finally { + lock.unlock(); + } } @Override public boolean isOpen() { - return delegate.isOpen(); + lock.lock(); + try { + return delegate.isOpen(); + } finally { + lock.unlock(); + } } @Override - public synchronized void close() throws IOException { - delegate.close(); + public void close() throws IOException { + lock.lock(); + try { + delegate.close(); + } finally { + lock.unlock(); + } } @Override - public synchronized void flush() throws IOException { - delegate.flush(); + public void flush() throws IOException { + lock.lock(); + try { + delegate.flush(); + } finally { + lock.unlock(); + } } } @@ -129,34 +169,61 @@ private static final class SynchronizedUnbufferedReadableByteChannel implements UnbufferedReadableByteChannel { private final UnbufferedReadableByteChannel delegate; + private final ReentrantLock lock; private SynchronizedUnbufferedReadableByteChannel(UnbufferedReadableByteChannel delegate) { this.delegate = delegate; + this.lock = new ReentrantLock(); } @Override - public synchronized int read(ByteBuffer src) throws IOException { - return delegate.read(src); + public int read(ByteBuffer src) throws IOException { + lock.lock(); + try { + return delegate.read(src); + } finally { + lock.unlock(); + } } @Override - public synchronized long read(ByteBuffer[] dsts) throws IOException { - return delegate.read(dsts); + public long read(ByteBuffer[] dsts) throws IOException { + lock.lock(); + try { + return delegate.read(dsts); + } finally { + lock.unlock(); + } } @Override - public synchronized long read(ByteBuffer[] dsts, int offset, int length) throws IOException { - return delegate.read(dsts, offset, length); + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + lock.lock(); + try { + return delegate.read(dsts, offset, length); + } finally { + lock.unlock(); + } } @Override public boolean isOpen() { - return delegate.isOpen(); + lock.lock(); + try { + return delegate.isOpen(); + } finally { + lock.unlock(); + } } @Override - public synchronized void close() throws IOException { - delegate.close(); + public void close() throws IOException { + lock.lock(); + try { + delegate.close(); + } finally { + lock.unlock(); + } } } @@ -164,50 +231,91 @@ private static final class SynchronizedUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel { private final UnbufferedWritableByteChannel delegate; + private final ReentrantLock lock; private SynchronizedUnbufferedWritableByteChannel(UnbufferedWritableByteChannel delegate) { this.delegate = delegate; + this.lock = new ReentrantLock(); } @Override - public synchronized int write(ByteBuffer src) throws IOException { - return delegate.write(src); + public int write(ByteBuffer src) throws IOException { + lock.lock(); + try { + return delegate.write(src); + } finally { + lock.unlock(); + } } @Override - public synchronized long write(ByteBuffer[] srcs) throws IOException { - return delegate.write(srcs); + public long write(ByteBuffer[] srcs) throws IOException { + lock.lock(); + try { + return delegate.write(srcs); + } finally { + lock.unlock(); + } } @Override - public synchronized long write(ByteBuffer[] srcs, int offset, int length) throws IOException { - return delegate.write(srcs, offset, length); + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + lock.lock(); + try { + return delegate.write(srcs, offset, length); + } finally { + lock.unlock(); + } } @Override - public synchronized int writeAndClose(ByteBuffer src) throws IOException { - return delegate.writeAndClose(src); + public int writeAndClose(ByteBuffer src) throws IOException { + lock.lock(); + try { + return delegate.writeAndClose(src); + } finally { + lock.unlock(); + } } @Override - public synchronized long writeAndClose(ByteBuffer[] srcs) throws IOException { - return delegate.writeAndClose(srcs); + public long writeAndClose(ByteBuffer[] srcs) throws IOException { + lock.lock(); + try { + return delegate.writeAndClose(srcs); + } finally { + lock.unlock(); + } } @Override - public synchronized long writeAndClose(ByteBuffer[] srcs, int offset, int length) - throws IOException { - return delegate.writeAndClose(srcs, offset, length); + public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException { + lock.lock(); + try { + return delegate.writeAndClose(srcs, offset, length); + } finally { + lock.unlock(); + } } @Override public boolean isOpen() { - return delegate.isOpen(); + lock.lock(); + try { + return delegate.isOpen(); + } finally { + lock.unlock(); + } } @Override - public synchronized void close() throws IOException { - delegate.close(); + public void close() throws IOException { + lock.lock(); + try { + delegate.close(); + } finally { + lock.unlock(); + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java index bf9d523b35..7d6d909dd6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java @@ -27,6 +27,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; /** @@ -348,15 +349,22 @@ public String toString() { final class ThroughputMovingWindowThroughputSink implements ThroughputSink { private final ThroughputMovingWindow w; private final Clock clock; + private final ReentrantLock lock; private ThroughputMovingWindowThroughputSink(ThroughputMovingWindow w, Clock clock) { this.w = w; this.clock = clock; + this.lock = new ReentrantLock(); } @Override - public synchronized void recordThroughput(Record r) { - w.add(r.end, Throughput.of(r.getNumBytes(), r.getDuration())); + public void recordThroughput(Record r) { + lock.lock(); + try { + w.add(r.end, Throughput.of(r.getNumBytes(), r.getDuration())); + } finally { + lock.unlock(); + } } @Override From fff6944130c67824fea23b2504e97e6217b17dc8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 17 May 2024 13:58:25 -0400 Subject: [PATCH 2/2] chore: cleanup --- .../java/com/google/cloud/storage/AsyncAppendingQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java index 3a9e01c882..a00c8d0359 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java @@ -90,7 +90,7 @@ private AsyncAppendingQueue( lock = new ReentrantLock(); } - synchronized AsyncAppendingQueue append(ApiFuture value) throws ShortCircuitException { + AsyncAppendingQueue append(ApiFuture value) throws ShortCircuitException { lock.lock(); try { checkState(state.isOpen(), "already closed"); @@ -124,7 +124,7 @@ T await() { } @Override - public synchronized void close() { + public void close() { lock.lock(); try { if (!state.isOpen()) {