diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 4b2eaa0bd983..3766da718d5b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1037,6 +1037,13 @@ public enum OperationStatusCode {
   public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
     32 * 1024 * 1024L;
 
+  /**
+   * Configuration key for setting pread must read both necessaryLen and extraLen, default is
+   * disabled. This is an optimized flag for reading HFile from blob storage.
+   */
+  public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
+  public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;
+
   /*
    * Minimum percentage of free heap necessary for a successful cluster startup.
    */
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
index 1e70d2d3dafd..e24e9f87f824 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
@@ -205,10 +205,31 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
    */
   public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
     int necessaryLen, int extraLen) throws IOException {
+    return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
+  }
+
+  /**
+   * Read from an input stream at least <code>necessaryLen</code> and if possible,
+   * <code>extraLen</code> also if available. Analogous to
+   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
+   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
+   * read.
+   * @param buff         ByteBuff to read into.
+   * @param dis          the input stream to read from
+   * @param position     the position within the stream from which to start reading
+   * @param necessaryLen the number of bytes that are absolutely necessary to read
+   * @param extraLen     the number of extra bytes that would be nice to read
+   * @param readAllBytes whether we must read the necessaryLen and extraLen
+   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
+   * @throws IOException if failed to read the necessary bytes
+   */
+  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
+    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
     int remain = necessaryLen + extraLen;
     byte[] buf = new byte[remain];
     int bytesRead = 0;
-    while (bytesRead < necessaryLen) {
+    int lengthMustRead = readAllBytes ? remain : necessaryLen;
+    while (bytesRead < lengthMustRead) {
       int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
       if (ret < 0) {
         throw new IOException("Premature EOF from inputStream (positional read returned " + ret
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 505e00d893c5..c08006a7ff7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1345,6 +1345,8 @@ static class FSReaderImpl implements FSReader {
 
     private final Lock streamLock = new ReentrantLock();
 
+    private final boolean isPreadAllBytes;
+
     FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator)
       throws IOException {
       this.fileSize = readerContext.getFileSize();
@@ -1361,6 +1363,7 @@ static class FSReaderImpl implements FSReader {
       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
       encodedBlockDecodingCtx = defaultDecodingCtx;
+      isPreadAllBytes = readerContext.isPreadAllBytes();
     }
 
     @Override
@@ -1449,7 +1452,9 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz
       } else {
         // Positional read. Better for random reads; or when the streamLock is already locked.
         int extraSize = peekIntoNextBlock ? hdrSize : 0;
-        if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
+        if (
+          !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
+        ) {
           // did not read the next block header.
           return false;
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java
index c652c4a18b53..d6f711d866eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -39,6 +40,7 @@ public enum ReaderType {
   private final HFileSystem hfs;
   private final boolean primaryReplicaReader;
   private final ReaderType type;
+  private final boolean preadAllBytes;
 
   public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
     HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
@@ -48,6 +50,8 @@ public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSiz
     this.hfs = hfs;
     this.primaryReplicaReader = primaryReplicaReader;
     this.type = type;
+    this.preadAllBytes = hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
+      HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
   }
 
   public Path getFilePath() {
@@ -73,4 +77,8 @@ public boolean isPrimaryReplicaReader() {
   public ReaderType getReaderType() {
     return this.type;
   }
+
+  public boolean isPreadAllBytes() {
+    return preadAllBytes;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index 00a38ee22729..01cb9cb7e173 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -26,14 +27,22 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
@@ -41,11 +50,13 @@
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
 
 @Category({ IOTests.class, SmallTests.class })
 public class TestBlockIOUtils {
@@ -54,11 +65,17 @@ public class TestBlockIOUtils {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestBlockIOUtils.class);
 
+  @Rule
+  public TestName testName = new TestName();
+
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
+  private static final int NUM_TEST_BLOCKS = 2;
+  private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;
+
   @Test
   public void testIsByteBufferReadable() throws IOException {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
@@ -89,6 +106,103 @@ public void testReadFully() throws IOException {
     assertArrayEquals(Bytes.toBytes(s), heapBuf);
   }
 
+  @Test
+  public void testPreadWithReadFullBytes() throws IOException {
+    testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
+  }
+
+  @Test
+  public void testPreadWithoutReadFullBytes() throws IOException {
+    testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
+  }
+
+  private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
+    throws IOException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
+    // give a fixed seed such we can see failure easily.
+    Random rand = new Random(randomSeed);
+    long totalDataBlockBytes =
+      writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
+    readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
+  }
+
+  private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
+    Path path) throws IOException {
+    FileSystem fs = HFileSystem.get(conf);
+    FSDataOutputStream os = fs.create(path);
+    HFileContext meta =
+      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
+    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
+    long totalDataBlockBytes = 0;
+    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
+      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
+        blockTypeOrdinal = BlockType.DATA.ordinal();
+      }
+      BlockType bt = BlockType.values()[blockTypeOrdinal];
+      DataOutputStream dos = hbw.startWriting(bt);
+      int size = rand.nextInt(500);
+      for (int j = 0; j < size; ++j) {
+        dos.writeShort(i + 1);
+        dos.writeInt(j + 1);
+      }
+
+      hbw.writeHeaderAndData(os);
+      totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
+    }
+    // append a dummy trailer and in a actual HFile it should have more data.
+    FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
+    trailer.setFirstDataBlockOffset(0);
+    trailer.setLastDataBlockOffset(totalDataBlockBytes);
+    trailer.setComparatorClass(meta.getCellComparator().getClass());
+    trailer.setDataIndexCount(NUM_TEST_BLOCKS);
+    trailer.setCompressionCodec(compressAlgo);
+    trailer.serialize(os);
+    // close the stream
+    os.close();
+    return totalDataBlockBytes;
+  }
+
+  private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
+    long totalDataBlockBytes) throws IOException {
+    FSDataInputStream is = fs.open(path);
+    HFileContext fileContext =
+      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
+    ReaderContext context =
+      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+        .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
+        .withFilePath(path).withFileSystem(fs).build();
+    HFileBlock.FSReader hbr =
+      new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP);
+
+    long onDiskSizeOfNextBlock = -1;
+    long offset = 0;
+    int numOfReadBlock = 0;
+    // offset and totalBytes shares the same logic in the HFilePreadReader
+    while (offset < totalDataBlockBytes) {
+      HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
+      numOfReadBlock++;
+      try {
+        onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
+        offset += block.getOnDiskSizeWithHeader();
+      } finally {
+        block.release();
+      }
+    }
+    assertEquals(totalDataBlockBytes, offset);
+    assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
+    deleteFile(fs, path);
+  }
+
+  private void deleteFile(FileSystem fs, Path path) throws IOException {
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+  }
+
   @Test
   public void testReadWithExtra() throws IOException {
     FileSystem fs = TEST_UTIL.getTestFileSystem();