From 12ee3f56072123e99179015a50df60a7492d6a41 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Mon, 6 Jun 2022 16:31:14 +0800 Subject: [PATCH] [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063) * [imporve][tiered storage] Reduce cpu usage when offloading the ledger --- *Motivation* When offloading a ledger, the BlockAwareSegmentInputStreamImpl will wrap the ledger handler and make it can stream output. Then the JCloud will read the stream as the payload and upload to the storage. In the JCloud implementation, it read the stream with a buffer https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68 In the current offload implementation, the read will call multiple times to construct the buffer and then return the data. After implement the read(byte[] b, int off, int len), the cpu usage reduced almost 10%. *Modifications* - Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl (cherry picked from commit 938ab7befc57a23e5a2bcb0f8bfe5c714c4d0018) --- .../BlockAwareSegmentInputStreamImpl.java | 117 ++++++- .../BlockAwareSegmentInputStreamTest.java | 328 ++++++++++++++++-- 2 files changed, 410 insertions(+), 35 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index a4ffdea65098f..b69f9f5e78544 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import java.io.IOException; @@ -27,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -44,6 +46,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; + static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); + + private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128); private final ReadHandle ledger; private final long startEntryId; @@ -65,6 +70,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */; // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. private List entriesByteBuf = null; + private int currentOffset = 0; + private final AtomicBoolean close = new AtomicBoolean(false); + public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { this.ledger = ledger; @@ -76,6 +84,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in this.entriesByteBuf = Lists.newLinkedList(); } + private ByteBuf readEntries(int len) throws IOException { + checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); + checkState(bytesReadOffset < blockSize); + + // once reach the end of entry buffer, read more, if there is more + if (bytesReadOffset < dataBlockFullOffset + && entriesByteBuf.isEmpty() + && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) { + entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); + } + + if (!entriesByteBuf.isEmpty() + && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) { + // always read from the first ByteBuf in the list, once read all of its content remove it. + ByteBuf entryByteBuf = entriesByteBuf.get(0); + int readableBytes = entryByteBuf.readableBytes(); + int read = Math.min(readableBytes, len); + ByteBuf buf = entryByteBuf.slice(currentOffset, read); + buf.retain(); + currentOffset += read; + entryByteBuf.readerIndex(currentOffset); + bytesReadOffset += read; + + if (entryByteBuf.readableBytes() == 0) { + entryByteBuf.release(); + entriesByteBuf.remove(0); + blockEntryCount++; + currentOffset = 0; + } + + return buf; + } else { + // no space for a new entry or there are no more entries + // set data block full, return end padding + if (dataBlockFullOffset == blockSize) { + dataBlockFullOffset = bytesReadOffset; + } + paddingBuf.clear(); + for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) { + paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) + % BLOCK_END_PADDING_BYTES.length]); + } + return paddingBuf.retain(); + } + } + // read ledger entries. private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); @@ -143,6 +197,46 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie } } + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException("The given bytes are null"); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length); + } else if (len == 0) { + return 0; + } + + int offset = off; + int readLen = len; + int readBytes = 0; + // reading header + if (dataBlockHeaderStream.available() > 0) { + int read = dataBlockHeaderStream.read(b, off, len); + offset += read; + readLen -= read; + readBytes += read; + bytesReadOffset += read; + } + if (readLen == 0) { + return readBytes; + } + + // reading ledger entries + if (bytesReadOffset < blockSize) { + readLen = Math.min(readLen, blockSize - bytesReadOffset); + ByteBuf readEntries = readEntries(readLen); + int read = readEntries.readableBytes(); + readEntries.readBytes(b, offset, read); + readEntries.release(); + readBytes += read; + return readBytes; + } + + // reached end + return -1; + } + @Override public int read() throws IOException { // reading header @@ -162,11 +256,20 @@ public int read() throws IOException { @Override public void close() throws IOException { - super.close(); - dataBlockHeaderStream.close(); - if (!entriesByteBuf.isEmpty()) { - entriesByteBuf.forEach(buf -> buf.release()); - entriesByteBuf.clear(); + // The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method. + // The stream resource used by the try-with block which will called the close + // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger + // the close method. + // So we add the close variable to avoid release paddingBuf twice. + if (!close.compareAndSet(false, true)) { + super.close(); + dataBlockHeaderStream.close(); + if (!entriesByteBuf.isEmpty()) { + entriesByteBuf.forEach(buf -> buf.release()); + entriesByteBuf.clear(); + } + paddingBuf.clear(); + paddingBuf.release(); } } @@ -185,6 +288,10 @@ public int getBlockSize() { return blockSize; } + public int getDataBlockFullOffset() { + return dataBlockFullOffset; + } + @Override public int getBlockEntryCount() { return blockEntryCount; diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5cf6bd5650003..0cd4bbd70a9e6 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -28,6 +29,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -44,6 +46,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -206,8 +209,16 @@ public CompletableFuture closeAsync() { } } - @Test - public void testHaveEndPadding() throws Exception { + @DataProvider(name = "useBufferRead") + public static Object[][] useBufferRead() { + return new Object[][]{ + {Boolean.TRUE}, + {Boolean.FALSE} + }; + } + + @Test(dataProvider = "useBufferRead") + public void testHaveEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 160; @@ -226,7 +237,12 @@ public void testHaveEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -240,9 +256,18 @@ public void testHaveEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -256,13 +281,36 @@ public void testHaveEndPadding() throws Exception { int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); assertEquals(left, 5); byte padding[] = new byte[left]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = left; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -272,8 +320,8 @@ public void testHaveEndPadding() throws Exception { inputStream.close(); } - @Test - public void testNoEndPadding() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 120; @@ -293,7 +341,12 @@ public void testNoEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -307,9 +360,18 @@ public void testNoEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -324,6 +386,11 @@ public void testNoEndPadding() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -333,8 +400,8 @@ public void testNoEndPadding() throws Exception { inputStream.close(); } - @Test - public void testReadTillLac() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testReadTillLac(boolean useBufferRead) throws Exception { // simulate last data block read. int ledgerId = 1; int entrySize = 8; @@ -354,7 +421,12 @@ public void testReadTillLac() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -368,9 +440,18 @@ public void testReadTillLac() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -385,6 +466,11 @@ public void testReadTillLac() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -394,8 +480,8 @@ public void testReadTillLac() throws Exception { inputStream.close(); } - @Test - public void testNoEntryPutIn() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEntryPutIn(boolean useBufferRead) throws Exception { // simulate first entry size over the block size budget, it shouldn't be added. // 2 entries, each with bigger size than block size, so there should no entry added into block. int ledgerId = 1; @@ -416,7 +502,12 @@ public void testNoEntryPutIn() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -424,13 +515,36 @@ public void testNoEntryPutIn() throws Exception { // 2. since no entry put in, it should only get padding after header. byte padding[] = new byte[blockSize - DataBlockHeaderImpl.getDataStartOffset()]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = padding.length; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 0); @@ -440,8 +554,8 @@ public void testNoEntryPutIn() throws Exception { inputStream.close(); } - @Test - public void testPaddingOnLastBlock() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 1000; int lac = 0; @@ -460,7 +574,12 @@ public void testPaddingOnLastBlock() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -474,9 +593,18 @@ public void testPaddingOnLastBlock() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -490,13 +618,36 @@ public void testPaddingOnLastBlock() throws Exception { int consumedBytes = DataBlockHeaderImpl.getDataStartOffset() + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE); byte padding[] = new byte[blockSize - consumedBytes]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = blockSize - consumedBytes; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 1); @@ -530,4 +681,121 @@ public void testOnlyNegativeOnEOF() throws Exception { } } + @Test + public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException { + int ledgerId = 1; + int entrySize = 10000; + int lac = 0; + + Random r = new Random(0); + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt()); + + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2; + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + + int bytesRead = 0; + int ret; + int offset = 0; + int resetOffsetCount = 0; + byte[] buf = new byte[1024]; + while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) { + bytesRead += ret; + int currentOffset = offset; + offset = (offset + ret) % buf.length; + if (offset < currentOffset) { + resetOffsetCount++; + } + } + assertEquals(bytesRead, blockSize); + assertNotEquals(resetOffsetCount, 0); + } + + // This test is for testing the read(byte[] buf, int off, int len) method can work properly + // on the offset not 0. + @Test + public void testReadTillLacWithSmallBuffer() throws Exception { + // simulate last data block read. + int ledgerId = 1; + int entrySize = 8; + int lac = 89; + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac); + + // set block size equals to (header + lac_entry) size. + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8); + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8); + + // verify get methods + assertEquals(inputStream.getLedger(), readHandle); + assertEquals(inputStream.getStartEntryId(), 0); + assertEquals(inputStream.getBlockSize(), blockSize); + + // verify read inputStream + // 1. read header. 128 + byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; + // read twice to test the offset not 0 case + int ret = inputStream.read(headerB, 0, 66); + assertEquals(ret, 66); + ret = inputStream.read(headerB, 66, headerB.length - 66); + assertEquals(headerB.length - 66, ret); + DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); + assertEquals(headerRead.getBlockLength(), blockSize); + assertEquals(headerRead.getFirstEntryId(), 0); + + byte[] entryData = new byte[entrySize]; + Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding + + // 2. read Ledger entries. 96 * 20 + IntStream.range(0, expectedEntryCount).forEach(i -> { + try { + byte lengthBuf[] = new byte[4]; + byte entryIdBuf[] = new byte[8]; + byte content[] = new byte[entrySize]; + + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + + Random random = new Random(System.currentTimeMillis()); + int o = 0; + int totalRead = 0; + int maxReadTime = 10; + while (o != content.length) { + int r; + if (maxReadTime-- == 0) { + r = entrySize - o; + } else { + r = random.nextInt(entrySize - o); + } + read = inputStream.read(content, o, r); + totalRead += read; + o += r; + } + assertEquals(totalRead, entrySize); + + assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); + assertEquals(i, Longs.fromByteArray(entryIdBuf)); + assertArrayEquals(entryData, content); + } catch (Exception e) { + fail("meet exception", e); + } + }); + + // 3. should have no padding + int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); + assertEquals(left, 0); + assertEquals(inputStream.getBlockSize(), inputStream.getDataBlockFullOffset()); + + // 4. reach end. + byte[] b = new byte[4]; + ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + + assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); + assertEquals(inputStream.getBlockEntryBytesCount(), entrySize * expectedEntryCount); + assertEquals(inputStream.getEndEntryId(), expectedEntryCount - 1); + + inputStream.close(); + } }