From d869a968b36cd99cbab7b35ab7be9f87a7f70ec0 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 7 Apr 2022 15:12:02 +0800 Subject: [PATCH 01/13] [imporve][tiered storage] Reduce cpu usage when offloading the ledger --- *Motivation* When offloading a ledger, the BlockAwareSegmentInputStreamImpl will wrap the ledger handler and make it can stream output. Then the JCloud will read the stream as the payload and upload to the storage. In the JCloud implementation, it read the stream with a buffer https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68 In the current offload implementation, the read will call multiple times to construct the buffer and then return the data. After implement the read(byte[] b, int off, int len), the cpu usage reduced almost 10%. *Modifications* - Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl --- .../BlockAwareSegmentInputStreamImpl.java | 83 +++++++ .../BlockAwareSegmentInputStreamTest.java | 219 +++++++++++++++--- 2 files changed, 269 insertions(+), 33 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index a4ffdea65098f..3a31ca0de3494 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import java.io.IOException; @@ -44,6 +45,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; + static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); private final ReadHandle ledger; private final long startEntryId; @@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in this.entriesByteBuf = Lists.newLinkedList(); } + private int currentOffset = 0; + + private byte[] readEntries(int len) throws IOException { + checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); + checkState(bytesReadOffset < blockSize); + + // once reach the end of entry buffer, read more, if there is more + if (bytesReadOffset < dataBlockFullOffset + && entriesByteBuf.isEmpty() + && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) { + entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); + } + + if (!entriesByteBuf.isEmpty() + && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) { + // always read from the first ByteBuf in the list, once read all of its content remove it. + ByteBuf entryByteBuf = entriesByteBuf.get(0); + int readableBytes = entryByteBuf.readableBytes(); + int read = Math.min(readableBytes, len); + byte[] buf = new byte[read]; + entryByteBuf.getBytes(currentOffset, buf); + currentOffset += read; + entryByteBuf.readerIndex(currentOffset); + bytesReadOffset += read; + + if (entryByteBuf.readableBytes() == 0) { + entryByteBuf.release(); + entriesByteBuf.remove(0); + blockEntryCount++; + currentOffset = 0; + } + + return buf; + } else { + // no space for a new entry or there are no more entries + // set data block full, return end padding + if (dataBlockFullOffset == blockSize) { + dataBlockFullOffset = bytesReadOffset; + } + return new byte[]{ + BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING_BYTES.length] + }; + + } + } + // read ledger entries. private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); @@ -143,6 +191,41 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie } } + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) { + return 0; + } + + int offset = off; + int readLen = len; + int readBytes = 0; + // reading header + if (dataBlockHeaderStream.available() > 0) { + int read = dataBlockHeaderStream.read(b, off, len); + offset += read; + readLen -= readLen; + readBytes += read; + bytesReadOffset += read; + } + if (readLen == 0) { + return readBytes; + } + + // reading ledger entries + if (bytesReadOffset < blockSize) { + byte[] readEntries = readEntries(readLen); + for (int i = 0; i < readEntries.length; i++) { + b[offset + i] = readEntries[i]; + readBytes++; + } + return readBytes; + } + + // reached end + return -1; + } + @Override public int read() throws IOException { // reading header diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5cf6bd5650003..c6a38e880d6b8 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -44,6 +45,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -206,8 +208,16 @@ public CompletableFuture closeAsync() { } } - @Test - public void testHaveEndPadding() throws Exception { + @DataProvider(name = "useBufferRead") + public static Object[][] useBufferRead() { + return new Object[][]{ + {Boolean.TRUE}, + {Boolean.FALSE} + }; + } + + @Test(dataProvider = "useBufferRead") + public void testHaveEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 160; @@ -226,7 +236,12 @@ public void testHaveEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -240,9 +255,18 @@ public void testHaveEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -256,13 +280,35 @@ public void testHaveEndPadding() throws Exception { int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); assertEquals(left, 5); byte padding[] = new byte[left]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + offset += ret; + } + } else { + int len = left; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -272,8 +318,8 @@ public void testHaveEndPadding() throws Exception { inputStream.close(); } - @Test - public void testNoEndPadding() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 120; @@ -293,7 +339,12 @@ public void testNoEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -307,9 +358,18 @@ public void testNoEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -324,6 +384,11 @@ public void testNoEndPadding() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -333,8 +398,8 @@ public void testNoEndPadding() throws Exception { inputStream.close(); } - @Test - public void testReadTillLac() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testReadTillLac(boolean useBufferRead) throws Exception { // simulate last data block read. int ledgerId = 1; int entrySize = 8; @@ -354,7 +419,12 @@ public void testReadTillLac() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -368,9 +438,18 @@ public void testReadTillLac() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -385,6 +464,11 @@ public void testReadTillLac() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -394,8 +478,8 @@ public void testReadTillLac() throws Exception { inputStream.close(); } - @Test - public void testNoEntryPutIn() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEntryPutIn(boolean useBufferRead) throws Exception { // simulate first entry size over the block size budget, it shouldn't be added. // 2 entries, each with bigger size than block size, so there should no entry added into block. int ledgerId = 1; @@ -416,7 +500,12 @@ public void testNoEntryPutIn() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -424,13 +513,35 @@ public void testNoEntryPutIn() throws Exception { // 2. since no entry put in, it should only get padding after header. byte padding[] = new byte[blockSize - DataBlockHeaderImpl.getDataStartOffset()]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + offset += ret; + } + } else { + int len = padding.length; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 0); @@ -440,8 +551,8 @@ public void testNoEntryPutIn() throws Exception { inputStream.close(); } - @Test - public void testPaddingOnLastBlock() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 1000; int lac = 0; @@ -460,7 +571,12 @@ public void testPaddingOnLastBlock() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -474,9 +590,18 @@ public void testPaddingOnLastBlock() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -490,13 +615,35 @@ public void testPaddingOnLastBlock() throws Exception { int consumedBytes = DataBlockHeaderImpl.getDataStartOffset() + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE); byte padding[] = new byte[blockSize - consumedBytes]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + offset += ret; + } + } else { + int len = blockSize - consumedBytes; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 1); @@ -506,8 +653,8 @@ public void testPaddingOnLastBlock() throws Exception { inputStream.close(); } - @Test - public void testOnlyNegativeOnEOF() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testOnlyNegativeOnEOF(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 10000; int lac = 0; @@ -520,7 +667,13 @@ public void testOnlyNegativeOnEOF() throws Exception { int bytesRead = 0; for (int i = 0; i < blockSize*2; i++) { - int ret = inputStream.read(); + int ret; + if (useBufferRead) { + byte[] b = new byte[1]; + ret = inputStream.read(b, 0, b.length); + } else { + ret = inputStream.read(); + } if (ret < 0) { // should only be EOF assertEquals(bytesRead, blockSize); break; From 9cd00c4c35b72f12e91514cf517773f47a141863 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 8 Apr 2022 08:41:47 +0800 Subject: [PATCH 02/13] Fix the failure ci --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index c6a38e880d6b8..c22840ac6eae8 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; From e5cf090493674da75e086b86ee3755072d5b2bd6 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 8 Apr 2022 17:17:16 +0800 Subject: [PATCH 03/13] Address comment --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 3a31ca0de3494..7705c1728c221 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -45,7 +45,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; - static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); + static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); private final ReadHandle ledger; private final long startEntryId; @@ -204,7 +204,7 @@ public int read(byte[] b, int off, int len) throws IOException { if (dataBlockHeaderStream.available() > 0) { int read = dataBlockHeaderStream.read(b, off, len); offset += read; - readLen -= readLen; + readLen -= read; readBytes += read; bytesReadOffset += read; } From 3d33df8756ee0a90818c77da6a1abef7d1a323ea Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 12 Apr 2022 08:58:41 +0800 Subject: [PATCH 04/13] Address comments --- .../BlockAwareSegmentInputStreamImpl.java | 6 ++- .../BlockAwareSegmentInputStreamTest.java | 39 +++++++++++++------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 7705c1728c221..9529f76ead7e2 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -193,7 +193,11 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie @Override public int read(byte[] b, int off, int len) throws IOException { - if (len == 0) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { return 0; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index c22840ac6eae8..b311941fe7e60 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -282,7 +283,7 @@ public void testHaveEndPadding(boolean useBufferRead) throws Exception { if (useBufferRead) { int ret = 0; int offset = 0; - while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } } else { @@ -515,7 +516,7 @@ public void testNoEntryPutIn(boolean useBufferRead) throws Exception { if (useBufferRead) { int ret = 0; int offset = 0; - while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } } else { @@ -617,7 +618,7 @@ public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { if (useBufferRead) { int ret = 0; int offset = 0; - while ((ret = inputStream.read(padding, offset, padding.length - ret)) > 0) { + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } } else { @@ -652,8 +653,8 @@ public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { inputStream.close(); } - @Test(dataProvider = "useBufferRead") - public void testOnlyNegativeOnEOF(boolean useBufferRead) throws Exception { + @Test() + public void testOnlyNegativeOnEOF() throws Exception { int ledgerId = 1; int entrySize = 10000; int lac = 0; @@ -666,13 +667,7 @@ public void testOnlyNegativeOnEOF(boolean useBufferRead) throws Exception { int bytesRead = 0; for (int i = 0; i < blockSize*2; i++) { - int ret; - if (useBufferRead) { - byte[] b = new byte[1]; - ret = inputStream.read(b, 0, b.length); - } else { - ret = inputStream.read(); - } + int ret = inputStream.read(); if (ret < 0) { // should only be EOF assertEquals(bytesRead, blockSize); break; @@ -682,4 +677,24 @@ public void testOnlyNegativeOnEOF(boolean useBufferRead) throws Exception { } } + @Test + public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException { + int ledgerId = 1; + int entrySize = 10000; + int lac = 0; + + Random r = new Random(0); + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt()); + + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2; + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + + int bytesRead = 0; + int ret; + byte[] buf = new byte[1024]; + while ((ret = inputStream.read(buf, 0, buf.length)) > 0) { + bytesRead += ret; + } + assertEquals(bytesRead, blockSize); + } } From c509c46f7e1a145dea1105d93f7e0d20b31598e2 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 12 Apr 2022 09:08:08 +0800 Subject: [PATCH 05/13] remove unused changes --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index b311941fe7e60..1f37bad514dec 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -653,7 +653,7 @@ public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { inputStream.close(); } - @Test() + @Test public void testOnlyNegativeOnEOF() throws Exception { int ledgerId = 1; int entrySize = 10000; From cd263ff6ffbeea83a8e256d33aee85323f9c2e8a Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 21 Apr 2022 09:21:15 +0800 Subject: [PATCH 06/13] Using ByteBuf to avoid new bytes every time --- .../BlockAwareSegmentInputStreamImpl.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 9529f76ead7e2..1909ba234796a 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -47,6 +47,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); + private static final ByteBuf PADDING_BUF = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); + private final ReadHandle ledger; private final long startEntryId; private final int blockSize; @@ -80,7 +82,7 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in private int currentOffset = 0; - private byte[] readEntries(int len) throws IOException { + private ByteBuf readEntries(int len) throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < blockSize); @@ -97,8 +99,8 @@ private byte[] readEntries(int len) throws IOException { ByteBuf entryByteBuf = entriesByteBuf.get(0); int readableBytes = entryByteBuf.readableBytes(); int read = Math.min(readableBytes, len); - byte[] buf = new byte[read]; - entryByteBuf.getBytes(currentOffset, buf); + ByteBuf buf = entryByteBuf.slice(currentOffset, read); + buf.retain(); currentOffset += read; entryByteBuf.readerIndex(currentOffset); bytesReadOffset += read; @@ -117,10 +119,10 @@ private byte[] readEntries(int len) throws IOException { if (dataBlockFullOffset == blockSize) { dataBlockFullOffset = bytesReadOffset; } - return new byte[]{ - BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING_BYTES.length] - }; - + PADDING_BUF.clear(); + PADDING_BUF.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) + % BLOCK_END_PADDING_BYTES.length]); + return PADDING_BUF.retain(); } } @@ -218,11 +220,11 @@ public int read(byte[] b, int off, int len) throws IOException { // reading ledger entries if (bytesReadOffset < blockSize) { - byte[] readEntries = readEntries(readLen); - for (int i = 0; i < readEntries.length; i++) { - b[offset + i] = readEntries[i]; - readBytes++; - } + ByteBuf readEntries = readEntries(readLen); + int read = readEntries.readableBytes(); + readEntries.readBytes(b, offset, read); + readEntries.release(); + readBytes += read; return readBytes; } From 187f2ec7f69946640a9113ca868162cce3969512 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 21 Apr 2022 14:56:44 +0800 Subject: [PATCH 07/13] Release buf when the stream close --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 858c48d0e4fcc..f63b2a91d50ad 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -275,6 +275,8 @@ public void close() throws IOException { entriesByteBuf.forEach(buf -> buf.release()); entriesByteBuf.clear(); } + PADDING_BUF.clear(); + PADDING_BUF.release(); } @Override From 8396adae35f1c68e99d71ae4c8457df73e21b045 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Sun, 24 Apr 2022 11:27:38 +0800 Subject: [PATCH 08/13] Fix the ut failure --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index f63b2a91d50ad..f2172bbef7602 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -49,7 +49,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); - private static final ByteBuf PADDING_BUF = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); + private final ByteBuf PADDING_BUF = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); private final ReadHandle ledger; private final long startEntryId; @@ -275,7 +275,6 @@ public void close() throws IOException { entriesByteBuf.forEach(buf -> buf.release()); entriesByteBuf.clear(); } - PADDING_BUF.clear(); PADDING_BUF.release(); } From bb8a3da9132d22eaf49dd6c64f34b9aa4f54de90 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Sun, 24 Apr 2022 11:33:40 +0800 Subject: [PATCH 09/13] Fix the checkstyle --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index f2172bbef7602..e96048d310edb 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -49,7 +49,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); - private final ByteBuf PADDING_BUF = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); + private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); private final ReadHandle ledger; private final long startEntryId; @@ -130,10 +130,10 @@ private ByteBuf readEntries(int len) throws IOException { if (dataBlockFullOffset == blockSize) { dataBlockFullOffset = bytesReadOffset; } - PADDING_BUF.clear(); - PADDING_BUF.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) + paddingBuf.clear(); + paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING_BYTES.length]); - return PADDING_BUF.retain(); + return paddingBuf.retain(); } } @@ -275,7 +275,8 @@ public void close() throws IOException { entriesByteBuf.forEach(buf -> buf.release()); entriesByteBuf.clear(); } - PADDING_BUF.release(); + paddingBuf.clear(); + paddingBuf.release(); } @Override From bc90b45f3ebc96d3ffc7274dfce7940293c36daf Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Sun, 24 Apr 2022 20:00:50 +0800 Subject: [PATCH 10/13] Fix the tests --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index e96048d310edb..c0db47944b76c 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -275,8 +275,10 @@ public void close() throws IOException { entriesByteBuf.forEach(buf -> buf.release()); entriesByteBuf.clear(); } - paddingBuf.clear(); - paddingBuf.release(); + if (paddingBuf.refCnt() != 0) { + paddingBuf.clear(); + paddingBuf.release(); + } } @Override From fcb7fd4fc2bcc602edbabc844dff6e72c936d5b2 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 29 Apr 2022 10:33:10 +0800 Subject: [PATCH 11/13] Address comment --- .../BlockAwareSegmentInputStreamImpl.java | 27 +++-- .../BlockAwareSegmentInputStreamTest.java | 109 +++++++++++++++++- 2 files changed, 123 insertions(+), 13 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index c0db47944b76c..b8209c76b00ba 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -73,8 +75,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre private List entriesByteBuf = null; private LedgerOffloaderStats offloaderStats; private String topicName; - private int currentOffset = 0; + private final AtomicBoolean close = new AtomicBoolean(false); public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { this.ledger = ledger; @@ -214,9 +216,9 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie @Override public int read(byte[] b, int off, int len) throws IOException { if (b == null) { - throw new NullPointerException(); + throw new NullPointerException("The given bytes are null"); } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); + throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length); } else if (len == 0) { return 0; } @@ -269,13 +271,18 @@ public int read() throws IOException { @Override public void close() throws IOException { - super.close(); - dataBlockHeaderStream.close(); - if (!entriesByteBuf.isEmpty()) { - entriesByteBuf.forEach(buf -> buf.release()); - entriesByteBuf.clear(); - } - if (paddingBuf.refCnt() != 0) { + // The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method. + // The stream resource used by the try-with block which will called the close + // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger + // the close method. + // So we add the close variable to avoid release paddingBuf twice. + if (!close.compareAndSet(false, true)) { + super.close(); + dataBlockHeaderStream.close(); + if (!entriesByteBuf.isEmpty()) { + entriesByteBuf.forEach(buf -> buf.release()); + entriesByteBuf.clear(); + } paddingBuf.clear(); paddingBuf.release(); } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 1f37bad514dec..cb3dd7faa5a88 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -45,6 +46,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.jclouds.io.ByteStreams2; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -216,8 +218,10 @@ public static Object[][] useBufferRead() { }; } - @Test(dataProvider = "useBufferRead") - public void testHaveEndPadding(boolean useBufferRead) throws Exception { +// @Test(dataProvider = "useBufferRead") + @Test + public void testHaveEndPadding() throws Exception { + boolean useBufferRead = true; int ledgerId = 1; int entrySize = 8; int lac = 160; @@ -286,6 +290,7 @@ public void testHaveEndPadding(boolean useBufferRead) throws Exception { while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } + assertEquals(inputStream.read(padding, 0, padding.length), -1); } else { int len = left; int offset = 0; @@ -519,6 +524,7 @@ public void testNoEntryPutIn(boolean useBufferRead) throws Exception { while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } + assertEquals(inputStream.read(padding, 0, padding.length), -1); } else { int len = padding.length; int offset = 0; @@ -621,6 +627,7 @@ public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { offset += ret; } + assertEquals(inputStream.read(padding, 0, padding.length), -1); } else { int len = blockSize - consumedBytes; int offset = 0; @@ -691,10 +698,106 @@ public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException { int bytesRead = 0; int ret; + int offset = 0; + int resetOffsetCount = 0; byte[] buf = new byte[1024]; - while ((ret = inputStream.read(buf, 0, buf.length)) > 0) { + while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) { bytesRead += ret; + int currentOffset = offset; + offset = (offset + ret) % buf.length; + if (offset < currentOffset) { + resetOffsetCount++; + } } assertEquals(bytesRead, blockSize); + assertNotEquals(resetOffsetCount, 0); + } + + // This test is for testing the read(byte[] buf, int off, int len) method can work properly + // on the offset not 0. + @Test + public void testReadTillLacWithSmallBuffer() throws Exception { + // simulate last data block read. + int ledgerId = 1; + int entrySize = 8; + int lac = 89; + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac); + + // set block size equals to (header + lac_entry) size. + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8); + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8); + + // verify get methods + assertEquals(inputStream.getLedger(), readHandle); + assertEquals(inputStream.getStartEntryId(), 0); + assertEquals(inputStream.getBlockSize(), blockSize); + + // verify read inputStream + // 1. read header. 128 + byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; + // read twice to test the offset not 0 case + int ret = inputStream.read(headerB, 0, 66); + assertEquals(ret, 66); + ret = inputStream.read(headerB, 66, headerB.length - 66); + assertEquals(headerB.length - 66, ret); + DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); + assertEquals(headerRead.getBlockLength(), blockSize); + assertEquals(headerRead.getFirstEntryId(), 0); + + byte[] entryData = new byte[entrySize]; + Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding + + // 2. read Ledger entries. 96 * 20 + IntStream.range(0, expectedEntryCount).forEach(i -> { + try { + byte lengthBuf[] = new byte[4]; + byte entryIdBuf[] = new byte[8]; + byte content[] = new byte[entrySize]; + + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + + Random random = new Random(System.currentTimeMillis()); + int o = 0; + int totalRead = 0; + int maxReadTime = 10; + while (o != content.length) { + int r; + if (maxReadTime-- == 0) { + r = entrySize - o; + } else { + r = random.nextInt(entrySize - o); + } + read = inputStream.read(content, o, r); + totalRead += read; + o += r; + } + assertEquals(totalRead, entrySize); + + assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); + assertEquals(i, Longs.fromByteArray(entryIdBuf)); + assertArrayEquals(entryData, content); + } catch (Exception e) { + fail("meet exception", e); + } + }); + + // 3. should have no padding + int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); + assertEquals(left, 0); + + // 4. reach end. + byte[] b = new byte[4]; + ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + + assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); + assertEquals(inputStream.getBlockEntryBytesCount(), entrySize * expectedEntryCount); + assertEquals(inputStream.getEndEntryId(), expectedEntryCount - 1); + + inputStream.close(); } } From 485fad4a17af0721d2ab493c06c06b3e9b52758c Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 29 Apr 2022 10:58:27 +0800 Subject: [PATCH 12/13] fix the style issue --- .../offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 1 - .../offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index b8209c76b00ba..7133d48a0c94d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index cb3dd7faa5a88..e1708763abef3 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -46,7 +46,6 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; -import org.jclouds.io.ByteStreams2; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Lists; From b0b24b48ec586d572f4c65c19ba9f9a9c9924714 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 26 May 2022 10:27:05 +0800 Subject: [PATCH 13/13] Address comments --- .../impl/BlockAwareSegmentInputStreamImpl.java | 13 ++++++++++--- .../impl/BlockAwareSegmentInputStreamTest.java | 7 +++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 7133d48a0c94d..52d069b0f990b 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -50,7 +50,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); - private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(1, 1); + private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128); private final ReadHandle ledger; private final long startEntryId; @@ -132,8 +132,10 @@ private ByteBuf readEntries(int len) throws IOException { dataBlockFullOffset = bytesReadOffset; } paddingBuf.clear(); - paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) - % BLOCK_END_PADDING_BYTES.length]); + for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) { + paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) + % BLOCK_END_PADDING_BYTES.length]); + } return paddingBuf.retain(); } } @@ -239,6 +241,7 @@ public int read(byte[] b, int off, int len) throws IOException { // reading ledger entries if (bytesReadOffset < blockSize) { + readLen = Math.min(readLen, blockSize - bytesReadOffset); ByteBuf readEntries = readEntries(readLen); int read = readEntries.readableBytes(); readEntries.readBytes(b, offset, read); @@ -302,6 +305,10 @@ public int getBlockSize() { return blockSize; } + public int getDataBlockFullOffset() { + return dataBlockFullOffset; + } + @Override public int getBlockEntryCount() { return blockEntryCount; diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index e1708763abef3..0cd4bbd70a9e6 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -217,10 +217,8 @@ public static Object[][] useBufferRead() { }; } -// @Test(dataProvider = "useBufferRead") - @Test - public void testHaveEndPadding() throws Exception { - boolean useBufferRead = true; + @Test(dataProvider = "useBufferRead") + public void testHaveEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 160; @@ -787,6 +785,7 @@ public void testReadTillLacWithSmallBuffer() throws Exception { // 3. should have no padding int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); assertEquals(left, 0); + assertEquals(inputStream.getBlockSize(), inputStream.getDataBlockFullOffset()); // 4. reach end. byte[] b = new byte[4];