From a16e88c308a9c4eca351cf0217335bb1194ab05a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 16 Nov 2022 11:57:00 +0100 Subject: [PATCH] [fix] Fix memory leak while Offloading ledgers - BlockAwareSegmentInputStreamImpl never releases the BookKeeper entries in the close method --- .../impl/BlockAwareSegmentInputStreamImpl.java | 2 +- .../impl/BlockAwareSegmentInputStreamTest.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 5f778cab51fd0..d07fbdb92477b 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -278,7 +278,7 @@ public void close() throws IOException { // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger // the close method. // So we add the close variable to avoid release paddingBuf twice. - if (!close.compareAndSet(false, true)) { + if (close.compareAndSet(false, true)) { super.close(); dataBlockHeaderStream.close(); if (!entriesByteBuf.isEmpty()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 47989e0360524..5ca4d6da20bee 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -798,4 +799,18 @@ public void testReadTillLacWithSmallBuffer() throws Exception { inputStream.close(); } + + @Test + public void testCloseReleaseResources() throws Exception { + ReadHandle readHandle = new MockReadHandle(1, 10, 10); + + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); + inputStream.read(); + Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); + field.setAccessible(true); + ByteBuf paddingBuf = (ByteBuf) field.get(inputStream); + assertEquals(1, paddingBuf.refCnt()); + inputStream.close(); + assertEquals(0, paddingBuf.refCnt()); + } }