diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 58b53622c8f2..efe30bb0b355 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1059,6 +1059,13 @@ public enum OperationStatusCode { public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT = 32 * 1024 * 1024L; + /** + * Configuration key for setting pread must read both necessaryLen and extraLen, default is + * disabled. This is an optimized flag for reading HFile from blob storage. + */ + public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled"; + public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false; + /* * Minimum percentage of free heap necessary for a successful cluster startup. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index 418d09c38af1..1720cae2300c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -228,21 +228,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec */ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen) throws IOException { + return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false); + } + + /** + * Read from an input stream at least necessaryLen and if possible, + * extraLen also if available. Analogous to + * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and + * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to + * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer + * directly, and does not allocate a temporary byte array. + * @param buff ByteBuff to read into. + * @param dis the input stream to read from + * @param position the position within the stream from which to start reading + * @param necessaryLen the number of bytes that are absolutely necessary to read + * @param extraLen the number of extra bytes that would be nice to read + * @param readAllBytes whether we must read the necessaryLen and extraLen + * @return true if and only if extraLen is > 0 and reading those extra bytes was successful + * @throws IOException if failed to read the necessary bytes + */ + public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); if (preadbytebuffer) { - return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen); + return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes); } else { - return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen); + return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes); } } private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, - int necessaryLen, int extraLen) throws IOException { + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { int remain = necessaryLen + extraLen; byte[] buf = new byte[remain]; int bytesRead = 0; - while (bytesRead < necessaryLen) { + int lengthMustRead = readAllBytes ? remain : necessaryLen; + while (bytesRead < lengthMustRead) { int ret = dis.read(position + bytesRead, buf, bytesRead, remain); if (ret < 0) { throw new IOException("Premature EOF from inputStream (positional read returned " + ret @@ -257,11 +279,12 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis } private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, - int necessaryLen, int extraLen) throws IOException { + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; ByteBuffer[] buffers = buff.nioByteBuffers(); ByteBuffer cur = buffers[idx]; - while (bytesRead < necessaryLen) { + int lengthMustRead = readAllBytes ? remain : necessaryLen; + while (bytesRead < lengthMustRead) { int ret; while (!cur.hasRemaining()) { if (++idx >= buffers.length) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 747fa16221f8..2cd60901613b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1349,6 +1349,8 @@ static class FSReaderImpl implements FSReader { private final Lock streamLock = new ReentrantLock(); + private final boolean isPreadAllBytes; + FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, Configuration conf) throws IOException { this.fileSize = readerContext.getFileSize(); @@ -1365,6 +1367,7 @@ static class FSReaderImpl implements FSReader { this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); encodedBlockDecodingCtx = defaultDecodingCtx; + isPreadAllBytes = readerContext.isPreadAllBytes(); } @Override @@ -1453,7 +1456,9 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz } else { // Positional read. Better for random reads; or when the streamLock is already locked. int extraSize = peekIntoNextBlock ? hdrSize : 0; - if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) { + if ( + !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) + ) { // did not read the next block header. return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java index c652c4a18b53..d6f711d866eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.yetus.audience.InterfaceAudience; @@ -39,6 +40,7 @@ public enum ReaderType { private final HFileSystem hfs; private final boolean primaryReplicaReader; private final ReaderType type; + private final boolean preadAllBytes; public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize, HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) { @@ -48,6 +50,8 @@ public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSiz this.hfs = hfs; this.primaryReplicaReader = primaryReplicaReader; this.type = type; + this.preadAllBytes = hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, + HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT); } public Path getFilePath() { @@ -73,4 +77,8 @@ public boolean isPrimaryReplicaReader() { public ReaderType getReaderType() { return this.type; } + + public boolean isPreadAllBytes() { + return preadAllBytes; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index 8f250c5eb52d..efc66111a9f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -28,15 +29,23 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; @@ -44,11 +53,13 @@ import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; @Category({ IOTests.class, SmallTests.class }) public class TestBlockIOUtils { @@ -57,11 +68,17 @@ public class TestBlockIOUtils { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBlockIOUtils.class); + @Rule + public TestName testName = new TestName(); + @Rule public ExpectedException exception = ExpectedException.none(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final int NUM_TEST_BLOCKS = 2; + private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ; + @Test public void testIsByteBufferReadable() throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); @@ -92,6 +109,103 @@ public void testReadFully() throws IOException { assertArrayEquals(Bytes.toBytes(s), heapBuf); } + @Test + public void testPreadWithReadFullBytes() throws IOException { + testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime()); + } + + @Test + public void testPreadWithoutReadFullBytes() throws IOException { + testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime()); + } + + private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed) + throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName()); + // give a fixed seed such we can see failure easily. + Random rand = new Random(randomSeed); + long totalDataBlockBytes = + writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path); + readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes); + } + + private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, + Path path) throws IOException { + FileSystem fs = HFileSystem.get(conf); + FSDataOutputStream os = fs.create(path); + HFileContext meta = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); + HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); + long totalDataBlockBytes = 0; + for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { + int blockTypeOrdinal = rand.nextInt(BlockType.values().length); + if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { + blockTypeOrdinal = BlockType.DATA.ordinal(); + } + BlockType bt = BlockType.values()[blockTypeOrdinal]; + DataOutputStream dos = hbw.startWriting(bt); + int size = rand.nextInt(500); + for (int j = 0; j < size; ++j) { + dos.writeShort(i + 1); + dos.writeInt(j + 1); + } + + hbw.writeHeaderAndData(os); + totalDataBlockBytes += hbw.getOnDiskSizeWithHeader(); + } + // append a dummy trailer and in a actual HFile it should have more data. + FixedFileTrailer trailer = new FixedFileTrailer(3, 3); + trailer.setFirstDataBlockOffset(0); + trailer.setLastDataBlockOffset(totalDataBlockBytes); + trailer.setComparatorClass(meta.getCellComparator().getClass()); + trailer.setDataIndexCount(NUM_TEST_BLOCKS); + trailer.setCompressionCodec(compressAlgo); + trailer.serialize(os); + // close the stream + os.close(); + return totalDataBlockBytes; + } + + private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo, + long totalDataBlockBytes) throws IOException { + FSDataInputStream is = fs.open(path); + HFileContext fileContext = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); + ReaderContext context = + new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) + .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes) + .withFilePath(path).withFileSystem(fs).build(); + HFileBlock.FSReader hbr = + new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf()); + + long onDiskSizeOfNextBlock = -1; + long offset = 0; + int numOfReadBlock = 0; + // offset and totalBytes shares the same logic in the HFilePreadReader + while (offset < totalDataBlockBytes) { + HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false); + numOfReadBlock++; + try { + onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); + offset += block.getOnDiskSizeWithHeader(); + } finally { + block.release(); + } + } + assertEquals(totalDataBlockBytes, offset); + assertEquals(NUM_TEST_BLOCKS, numOfReadBlock); + deleteFile(fs, path); + } + + private void deleteFile(FileSystem fs, Path path) throws IOException { + if (fs.exists(path)) { + fs.delete(path, true); + } + } + @Test public void testReadWithExtra() throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem();