diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index a4ffdea65098f..b69f9f5e78544 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import java.io.IOException; @@ -27,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -44,6 +46,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD }; + static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD); + + private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128); private final ReadHandle ledger; private final long startEntryId; @@ -65,6 +70,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */; // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. private List entriesByteBuf = null; + private int currentOffset = 0; + private final AtomicBoolean close = new AtomicBoolean(false); + public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { this.ledger = ledger; @@ -76,6 +84,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in this.entriesByteBuf = Lists.newLinkedList(); } + private ByteBuf readEntries(int len) throws IOException { + checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); + checkState(bytesReadOffset < blockSize); + + // once reach the end of entry buffer, read more, if there is more + if (bytesReadOffset < dataBlockFullOffset + && entriesByteBuf.isEmpty() + && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) { + entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); + } + + if (!entriesByteBuf.isEmpty() + && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) { + // always read from the first ByteBuf in the list, once read all of its content remove it. + ByteBuf entryByteBuf = entriesByteBuf.get(0); + int readableBytes = entryByteBuf.readableBytes(); + int read = Math.min(readableBytes, len); + ByteBuf buf = entryByteBuf.slice(currentOffset, read); + buf.retain(); + currentOffset += read; + entryByteBuf.readerIndex(currentOffset); + bytesReadOffset += read; + + if (entryByteBuf.readableBytes() == 0) { + entryByteBuf.release(); + entriesByteBuf.remove(0); + blockEntryCount++; + currentOffset = 0; + } + + return buf; + } else { + // no space for a new entry or there are no more entries + // set data block full, return end padding + if (dataBlockFullOffset == blockSize) { + dataBlockFullOffset = bytesReadOffset; + } + paddingBuf.clear(); + for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) { + paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset) + % BLOCK_END_PADDING_BYTES.length]); + } + return paddingBuf.retain(); + } + } + // read ledger entries. private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); @@ -143,6 +197,46 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie } } + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException("The given bytes are null"); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length); + } else if (len == 0) { + return 0; + } + + int offset = off; + int readLen = len; + int readBytes = 0; + // reading header + if (dataBlockHeaderStream.available() > 0) { + int read = dataBlockHeaderStream.read(b, off, len); + offset += read; + readLen -= read; + readBytes += read; + bytesReadOffset += read; + } + if (readLen == 0) { + return readBytes; + } + + // reading ledger entries + if (bytesReadOffset < blockSize) { + readLen = Math.min(readLen, blockSize - bytesReadOffset); + ByteBuf readEntries = readEntries(readLen); + int read = readEntries.readableBytes(); + readEntries.readBytes(b, offset, read); + readEntries.release(); + readBytes += read; + return readBytes; + } + + // reached end + return -1; + } + @Override public int read() throws IOException { // reading header @@ -162,11 +256,20 @@ public int read() throws IOException { @Override public void close() throws IOException { - super.close(); - dataBlockHeaderStream.close(); - if (!entriesByteBuf.isEmpty()) { - entriesByteBuf.forEach(buf -> buf.release()); - entriesByteBuf.clear(); + // The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method. + // The stream resource used by the try-with block which will called the close + // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger + // the close method. + // So we add the close variable to avoid release paddingBuf twice. + if (!close.compareAndSet(false, true)) { + super.close(); + dataBlockHeaderStream.close(); + if (!entriesByteBuf.isEmpty()) { + entriesByteBuf.forEach(buf -> buf.release()); + entriesByteBuf.clear(); + } + paddingBuf.clear(); + paddingBuf.release(); } } @@ -185,6 +288,10 @@ public int getBlockSize() { return blockSize; } + public int getDataBlockFullOffset() { + return dataBlockFullOffset; + } + @Override public int getBlockEntryCount() { return blockEntryCount; diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5cf6bd5650003..0cd4bbd70a9e6 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -28,6 +29,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -44,6 +46,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -206,8 +209,16 @@ public CompletableFuture closeAsync() { } } - @Test - public void testHaveEndPadding() throws Exception { + @DataProvider(name = "useBufferRead") + public static Object[][] useBufferRead() { + return new Object[][]{ + {Boolean.TRUE}, + {Boolean.FALSE} + }; + } + + @Test(dataProvider = "useBufferRead") + public void testHaveEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 160; @@ -226,7 +237,12 @@ public void testHaveEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -240,9 +256,18 @@ public void testHaveEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -256,13 +281,36 @@ public void testHaveEndPadding() throws Exception { int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); assertEquals(left, 5); byte padding[] = new byte[left]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = left; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -272,8 +320,8 @@ public void testHaveEndPadding() throws Exception { inputStream.close(); } - @Test - public void testNoEndPadding() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEndPadding(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 8; int lac = 120; @@ -293,7 +341,12 @@ public void testNoEndPadding() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -307,9 +360,18 @@ public void testNoEndPadding() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -324,6 +386,11 @@ public void testNoEndPadding() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -333,8 +400,8 @@ public void testNoEndPadding() throws Exception { inputStream.close(); } - @Test - public void testReadTillLac() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testReadTillLac(boolean useBufferRead) throws Exception { // simulate last data block read. int ledgerId = 1; int entrySize = 8; @@ -354,7 +421,12 @@ public void testReadTillLac() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -368,9 +440,18 @@ public void testReadTillLac() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -385,6 +466,11 @@ public void testReadTillLac() throws Exception { assertEquals(left, 0); // 4. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); @@ -394,8 +480,8 @@ public void testReadTillLac() throws Exception { inputStream.close(); } - @Test - public void testNoEntryPutIn() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testNoEntryPutIn(boolean useBufferRead) throws Exception { // simulate first entry size over the block size budget, it shouldn't be added. // 2 entries, each with bigger size than block size, so there should no entry added into block. int ledgerId = 1; @@ -416,7 +502,12 @@ public void testNoEntryPutIn() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -424,13 +515,36 @@ public void testNoEntryPutIn() throws Exception { // 2. since no entry put in, it should only get padding after header. byte padding[] = new byte[blockSize - DataBlockHeaderImpl.getDataStartOffset()]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = padding.length; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 0); @@ -440,8 +554,8 @@ public void testNoEntryPutIn() throws Exception { inputStream.close(); } - @Test - public void testPaddingOnLastBlock() throws Exception { + @Test(dataProvider = "useBufferRead") + public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception { int ledgerId = 1; int entrySize = 1000; int lac = 0; @@ -460,7 +574,12 @@ public void testPaddingOnLastBlock() throws Exception { // verify read inputStream // 1. read header. 128 byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; - ByteStreams.readFully(inputStream, headerB); + if (useBufferRead) { + int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset()); + assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret); + } else { + ByteStreams.readFully(inputStream, headerB); + } DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); assertEquals(headerRead.getBlockLength(), blockSize); assertEquals(headerRead.getFirstEntryId(), 0); @@ -474,9 +593,18 @@ public void testPaddingOnLastBlock() throws Exception { byte lengthBuf[] = new byte[4]; byte entryIdBuf[] = new byte[8]; byte content[] = new byte[entrySize]; - inputStream.read(lengthBuf); - inputStream.read(entryIdBuf); - inputStream.read(content); + if (useBufferRead) { + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + read = inputStream.read(content, 0, entrySize); + assertEquals(read, entrySize); + } else { + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + } assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); assertEquals(i, Longs.fromByteArray(entryIdBuf)); @@ -490,13 +618,36 @@ public void testPaddingOnLastBlock() throws Exception { int consumedBytes = DataBlockHeaderImpl.getDataStartOffset() + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE); byte padding[] = new byte[blockSize - consumedBytes]; - inputStream.read(padding); + if (useBufferRead) { + int ret = 0; + int offset = 0; + while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) { + offset += ret; + } + assertEquals(inputStream.read(padding, 0, padding.length), -1); + } else { + int len = blockSize - consumedBytes; + int offset = 0; + byte[] buf = new byte[4]; + while (len > 0) { + int ret = inputStream.read(buf); + for (int i = 0; i < ret; i++) { + padding[offset++] = buf[i]; + } + len -= ret; + } + } ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); IntStream.range(0, paddingBuf.capacity()/4).forEach(i -> assertEquals(Integer.toHexString(paddingBuf.readInt()), Integer.toHexString(0xFEDCDEAD))); // 3. reach end. + if (useBufferRead) { + byte[] b = new byte[4]; + int ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + } assertEquals(inputStream.read(), -1); assertEquals(inputStream.getBlockEntryCount(), 1); @@ -530,4 +681,121 @@ public void testOnlyNegativeOnEOF() throws Exception { } } + @Test + public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException { + int ledgerId = 1; + int entrySize = 10000; + int lac = 0; + + Random r = new Random(0); + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt()); + + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2; + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + + int bytesRead = 0; + int ret; + int offset = 0; + int resetOffsetCount = 0; + byte[] buf = new byte[1024]; + while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) { + bytesRead += ret; + int currentOffset = offset; + offset = (offset + ret) % buf.length; + if (offset < currentOffset) { + resetOffsetCount++; + } + } + assertEquals(bytesRead, blockSize); + assertNotEquals(resetOffsetCount, 0); + } + + // This test is for testing the read(byte[] buf, int off, int len) method can work properly + // on the offset not 0. + @Test + public void testReadTillLacWithSmallBuffer() throws Exception { + // simulate last data block read. + int ledgerId = 1; + int entrySize = 8; + int lac = 89; + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac); + + // set block size equals to (header + lac_entry) size. + int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8); + BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); + int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8); + + // verify get methods + assertEquals(inputStream.getLedger(), readHandle); + assertEquals(inputStream.getStartEntryId(), 0); + assertEquals(inputStream.getBlockSize(), blockSize); + + // verify read inputStream + // 1. read header. 128 + byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()]; + // read twice to test the offset not 0 case + int ret = inputStream.read(headerB, 0, 66); + assertEquals(ret, 66); + ret = inputStream.read(headerB, 66, headerB.length - 66); + assertEquals(headerB.length - 66, ret); + DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); + assertEquals(headerRead.getBlockLength(), blockSize); + assertEquals(headerRead.getFirstEntryId(), 0); + + byte[] entryData = new byte[entrySize]; + Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding + + // 2. read Ledger entries. 96 * 20 + IntStream.range(0, expectedEntryCount).forEach(i -> { + try { + byte lengthBuf[] = new byte[4]; + byte entryIdBuf[] = new byte[8]; + byte content[] = new byte[entrySize]; + + int read = inputStream.read(lengthBuf, 0, 4); + assertEquals(read, 4); + read = inputStream.read(entryIdBuf, 0, 8); + assertEquals(read, 8); + + Random random = new Random(System.currentTimeMillis()); + int o = 0; + int totalRead = 0; + int maxReadTime = 10; + while (o != content.length) { + int r; + if (maxReadTime-- == 0) { + r = entrySize - o; + } else { + r = random.nextInt(entrySize - o); + } + read = inputStream.read(content, o, r); + totalRead += read; + o += r; + } + assertEquals(totalRead, entrySize); + + assertEquals(entrySize, Ints.fromByteArray(lengthBuf)); + assertEquals(i, Longs.fromByteArray(entryIdBuf)); + assertArrayEquals(entryData, content); + } catch (Exception e) { + fail("meet exception", e); + } + }); + + // 3. should have no padding + int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); + assertEquals(left, 0); + assertEquals(inputStream.getBlockSize(), inputStream.getDataBlockFullOffset()); + + // 4. reach end. + byte[] b = new byte[4]; + ret = inputStream.read(b, 0, 4); + assertEquals(ret, -1); + + assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount); + assertEquals(inputStream.getBlockEntryBytesCount(), entrySize * expectedEntryCount); + assertEquals(inputStream.getEndEntryId(), expectedEntryCount - 1); + + inputStream.close(); + } }