Skip to content

Commit

Permalink
HBASE-27232 Fix checking for encoded block size when deciding if bloc… (
Browse files Browse the repository at this point in the history
#4640)

Signed-off-by: Andor Molnár <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
Signed-off-by: Ankit Singhal <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
wchevreuil authored Jul 25, 2022
1 parent b1706a8 commit d5ed8f5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat
}
closeOutputStream = path != null;
this.cacheConf = cacheConf;
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);

finishInit(conf);
if (LOG.isTraceEnabled()) {
LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
Expand Down Expand Up @@ -309,12 +310,16 @@ protected void finishInit(final Configuration conf) {
* At a block boundary, write all the inline blocks and opens new block.
*/
protected void checkBlockBoundary() throws IOException {
// For encoder like prefixTree, encoded size is not available, so we have to compare both
// encoded size and unencoded size to blocksize limit.
if (
blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()
) {
boolean shouldFinishBlock = false;
// This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
// and we should use the encoding ratio
if (encodedBlockSizeLimit > 0) {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
} else {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
newBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
Expand Down Expand Up @@ -1141,4 +1144,53 @@ public void testDataBlockEncodingMetaData() throws IOException {
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
assertArrayEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
}

@Test
public void testDataBlockSizeEncoded() throws Exception {
// Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
Path path = new Path(dir, "1234567890");

DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;

conf.setDouble("hbase.writer.unified.encoded.blocksize.ratio", 1);

cacheConf = new CacheConfig(conf);
HFileContext meta =
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo).build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
writeStoreFile(writer);

HStoreFile storeFile =
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
storeFile.initReader();
StoreFileReader reader = storeFile.getReader();

Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
assertEquals(dataBlockEncoderAlgo.name(), Bytes.toString(value));

HFile.Reader fReader =
HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);

FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
long fileSize = fs.getFileStatus(writer.getPath()).getLen();
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
HFileBlock block;
while (offset <= max) {
block = fReader.readBlock(offset, -1, /* cacheBlock */
false, /* pread */ false, /* isCompaction */ false, /* updateCacheMetrics */
false, null, null);
offset += block.getOnDiskSizeWithHeader();
double diff = block.getOnDiskSizeWithHeader() - BLOCKSIZE_SMALL;
if (offset <= max) {
assertTrue(diff >= 0 && diff < (BLOCKSIZE_SMALL * 0.05));
}
}
}

}

0 comments on commit d5ed8f5

Please sign in to comment.