Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27013 Introduce read all bytes when using pread for prefetch #4414

Merged
merged 2 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,13 @@ public enum OperationStatusCode {
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
32 * 1024 * 1024L;

/**
* Configuration key for setting pread must read both necessaryLen and extraLen, default is
* disabled. This is an optimized flag for reading HFile from blob storage.
*/
public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;

/*
* Minimum percentage of free heap necessary for a successful cluster startup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
}

/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
* directly, and does not allocate a temporary byte array.
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @param readAllBytes whether we must read the necessaryLen and extraLen
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");

if (preadbytebuffer) {
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
} else {
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
}
}

private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int lengthMustRead = readAllBytes ? remain : necessaryLen;
while (bytesRead < lengthMustRead) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for not using dis.readFully() into the byte array?

if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
Expand All @@ -257,11 +279,12 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis
}

private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 305 on this method, the NPE raised should be included as the cause of the IOE. that way if the input stream code raises NPEs, debugging it becomes possible

int necessaryLen, int extraLen) throws IOException {
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
ByteBuffer[] buffers = buff.nioByteBuffers();
ByteBuffer cur = buffers[idx];
while (bytesRead < necessaryLen) {
int lengthMustRead = readAllBytes ? remain : necessaryLen;
while (bytesRead < lengthMustRead) {
int ret;
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,12 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0;
if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
boolean readAllBytes =
hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking some more, we should really do this once and not for every call to readAtOffset(..). What about moving this logic into the ReaderContext and cache it there to avoid the expensive lookup into the Hadoop Configuration object.

if (
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, readAllBytes)
) {
// did not read the next block header.
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -28,15 +29,23 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
Expand All @@ -49,6 +58,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;

@Category({ IOTests.class, SmallTests.class })
public class TestBlockIOUtils {
Expand All @@ -57,11 +67,17 @@ public class TestBlockIOUtils {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockIOUtils.class);

@Rule
public TestName testName = new TestName();

@Rule
public ExpectedException exception = ExpectedException.none();

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();

private static final int NUM_TEST_BLOCKS = 2;
private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;

@Test
public void testIsByteBufferReadable() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down Expand Up @@ -92,6 +108,102 @@ public void testReadFully() throws IOException {
assertArrayEquals(Bytes.toBytes(s), heapBuf);
}

@Test
public void testPreadWithReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(true);
}

@Test
public void testPreadWithoutReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(false);
}

private void testPreadReadFullBytesInternal(boolean readAllBytes) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
// give a fixed seed such we can see failure easily.
Random rand = new Random(5685632);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about choosing a random seed (maybe System.currentTimeMillis()) in testPreadWithoutReadFullBytes() and testPreadWithReadFullBytes() and pass that seed into testPreadReadFullBytesInternal(..). As long as we log the seed we get the benefit of randomly choosing different data every time but a human can come back and replay the same exact seed (as we'd have the seed logged in surefire-reports).

long totalDataBlockBytes =
writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
}

private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
Path path) throws IOException {
FileSystem fs = HFileSystem.get(conf);
FSDataOutputStream os = fs.create(path);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safer within try/finally or try with resources

HFileContext meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
long totalDataBlockBytes = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
blockTypeOrdinal = BlockType.DATA.ordinal();
}
BlockType bt = BlockType.values()[blockTypeOrdinal];
DataOutputStream dos = hbw.startWriting(bt);
int size = rand.nextInt(500);
for (int j = 0; j < size; ++j) {
dos.writeShort(i + 1);
dos.writeInt(j + 1);
}

hbw.writeHeaderAndData(os);
totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
}
// append a dummy trailer and in a actual HFile it should have more data.
FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
trailer.setFirstDataBlockOffset(0);
trailer.setLastDataBlockOffset(totalDataBlockBytes);
trailer.setComparatorClass(meta.getCellComparator().getClass());
trailer.setDataIndexCount(NUM_TEST_BLOCKS);
trailer.setCompressionCodec(compressAlgo);
trailer.serialize(os);
// close the stream
os.close();
return totalDataBlockBytes;
}

private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
long totalDataBlockBytes) throws IOException {
FSDataInputStream is = fs.open(path);
HFileContext fileContext =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
.withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());

long onDiskSizeOfNextBlock = -1;
long offset = 0;
int numOfReadBlock = 0;
// offset and totalBytes shares the same logic in the HFilePreadReader
while (offset < totalDataBlockBytes) {
HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
numOfReadBlock++;
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
} finally {
block.release();
}
}
assertEquals(totalDataBlockBytes, offset);
assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
deleteFile(fs, path);
}

private void deleteFile(FileSystem fs, Path path) throws IOException {
if (fs.exists(path)) {
fs.delete(path, true);
}
}

@Test
public void testReadWithExtra() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down