Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27033 Backport "HBASE-27013 Introduce read all bytes when using… #4429

Merged
merged 1 commit into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,13 @@ public enum OperationStatusCode {
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
32 * 1024 * 1024L;

/**
* Configuration key for setting pread must read both necessaryLen and extraLen, default is
* disabled. This is an optimized flag for reading HFile from blob storage.
*/
public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;

/*
* Minimum percentage of free heap necessary for a successful cluster startup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,31 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
}

/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read.
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @param readAllBytes whether we must read the necessaryLen and extraLen
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int lengthMustRead = readAllBytes ? remain : necessaryLen;
while (bytesRead < lengthMustRead) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,8 @@ static class FSReaderImpl implements FSReader {

private final Lock streamLock = new ReentrantLock();

private final boolean isPreadAllBytes;

FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException {
this.fileSize = readerContext.getFileSize();
Expand All @@ -1361,6 +1363,7 @@ static class FSReaderImpl implements FSReader {
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
encodedBlockDecodingCtx = defaultDecodingCtx;
isPreadAllBytes = readerContext.isPreadAllBytes();
}

@Override
Expand Down Expand Up @@ -1449,7 +1452,9 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0;
if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
if (
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
) {
Comment on lines +1455 to +1457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
) {
if (!BlockIOUtils.preadWithExtra(
dest, istream, fileOffset, size, extraSize, isPreadAllBytes)) {

Typically, we break long lines with method invocations around the arguments, rather than break up the if condition itself.

However, if checkstyle is happy with this how it is, then it's fine :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was modified by spotless:apply, and yet the checkstyle was happy about it

// did not read the next block header.
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -39,6 +40,7 @@ public enum ReaderType {
private final HFileSystem hfs;
private final boolean primaryReplicaReader;
private final ReaderType type;
private final boolean preadAllBytes;

public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
Expand All @@ -48,6 +50,8 @@ public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSiz
this.hfs = hfs;
this.primaryReplicaReader = primaryReplicaReader;
this.type = type;
this.preadAllBytes = hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
}

public Path getFilePath() {
Expand All @@ -73,4 +77,8 @@ public boolean isPrimaryReplicaReader() {
public ReaderType getReaderType() {
return this.type;
}

public boolean isPreadAllBytes() {
return preadAllBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -26,26 +27,36 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;

@Category({ IOTests.class, SmallTests.class })
public class TestBlockIOUtils {
Expand All @@ -54,11 +65,17 @@ public class TestBlockIOUtils {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockIOUtils.class);

@Rule
public TestName testName = new TestName();

@Rule
public ExpectedException exception = ExpectedException.none();

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private static final int NUM_TEST_BLOCKS = 2;
private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;

@Test
public void testIsByteBufferReadable() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down Expand Up @@ -89,6 +106,103 @@ public void testReadFully() throws IOException {
assertArrayEquals(Bytes.toBytes(s), heapBuf);
}

@Test
public void testPreadWithReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
}

@Test
public void testPreadWithoutReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
}

private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
// give a fixed seed such we can see failure easily.
Random rand = new Random(randomSeed);
long totalDataBlockBytes =
writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
}

private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
Path path) throws IOException {
FileSystem fs = HFileSystem.get(conf);
FSDataOutputStream os = fs.create(path);
HFileContext meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
long totalDataBlockBytes = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
blockTypeOrdinal = BlockType.DATA.ordinal();
}
BlockType bt = BlockType.values()[blockTypeOrdinal];
DataOutputStream dos = hbw.startWriting(bt);
int size = rand.nextInt(500);
for (int j = 0; j < size; ++j) {
dos.writeShort(i + 1);
dos.writeInt(j + 1);
}

hbw.writeHeaderAndData(os);
totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
}
// append a dummy trailer and in a actual HFile it should have more data.
FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
trailer.setFirstDataBlockOffset(0);
trailer.setLastDataBlockOffset(totalDataBlockBytes);
trailer.setComparatorClass(meta.getCellComparator().getClass());
trailer.setDataIndexCount(NUM_TEST_BLOCKS);
trailer.setCompressionCodec(compressAlgo);
trailer.serialize(os);
// close the stream
os.close();
return totalDataBlockBytes;
}

private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
long totalDataBlockBytes) throws IOException {
FSDataInputStream is = fs.open(path);
HFileContext fileContext =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
.withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP);

long onDiskSizeOfNextBlock = -1;
long offset = 0;
int numOfReadBlock = 0;
// offset and totalBytes shares the same logic in the HFilePreadReader
while (offset < totalDataBlockBytes) {
HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
numOfReadBlock++;
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
} finally {
block.release();
}
}
assertEquals(totalDataBlockBytes, offset);
assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
deleteFile(fs, path);
}

private void deleteFile(FileSystem fs, Path path) throws IOException {
if (fs.exists(path)) {
fs.delete(path, true);
}
}

@Test
public void testReadWithExtra() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down