diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index c8f9dd7c0103..d34417f701db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -392,12 +392,12 @@ static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final /** * Parse total on disk size including header and checksum. - * @param headerBuf Header ByteBuffer. Presumed exact size of header. - * @param verifyChecksum true if checksum verification is in use. + * @param headerBuf Header ByteBuffer. Presumed exact size of header. + * @param checksumSupport true if checksum verification is in use. * @return Size of the block with header included. */ - private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) { - return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); + private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean checksumSupport) { + return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(checksumSupport); } /** @@ -1597,33 +1597,48 @@ public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean } /** - * Returns Check onDiskSizeWithHeaderL size is healthy and then return it as an int + * Check that {@code value} read from a block header seems reasonable, within a large margin of + * error. + * @return {@code true} if the value is safe to proceed, {@code false} otherwise. */ - private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) - throws IOException { - if ( - (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) - || onDiskSizeWithHeaderL >= Integer.MAX_VALUE - ) { - throw new IOException( - "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize - + " and at most " + Integer.MAX_VALUE + ", or -1"); + private boolean checkOnDiskSizeWithHeader(int value) { + if (value < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "onDiskSizeWithHeader={}; value represents a size, so it should never be negative.", + value); + } + return false; + } + if (value - hdrSize < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("onDiskSizeWithHeader={}, hdrSize={}; don't accept a value that is negative" + + " after the header size is excluded.", value, hdrSize); + } + return false; } - return (int) onDiskSizeWithHeaderL; + return true; } /** - * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is - * not right. + * Check that {@code value} provided by the calling context seems reasonable, within a large + * margin of error. + * @return {@code true} if the value is safe to proceed, {@code false} otherwise. */ - private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, - final long offset, boolean verifyChecksum) throws IOException { - // Assert size provided aligns with what is in the header - int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); - if (passedIn != fromHeader) { - throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader - + ", offset=" + offset + ", fileContext=" + this.fileContext); + private boolean checkCallerProvidedOnDiskSizeWithHeader(long value) { + // same validation logic as is used by Math.toIntExact(long) + int intValue = (int) value; + if (intValue != value) { + if (LOG.isTraceEnabled()) { + LOG.trace("onDiskSizeWithHeaderL={}; value exceeds int size limits.", value); + } + return false; + } + if (intValue == -1) { + // a magic value we expect to see. + return true; } + return checkOnDiskSizeWithHeader(intValue); } /** @@ -1654,14 +1669,16 @@ private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, this.prefetchedHeader.set(ph); } - private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, - int onDiskSizeWithHeader) { - int nextBlockOnDiskSize = -1; - if (readNextHeader) { - nextBlockOnDiskSize = - onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize; - } - return nextBlockOnDiskSize; + /** + * Clear the cached value when its integrity is suspect. + */ + private void invalidateNextBlockHeader() { + prefetchedHeader.set(null); + } + + private int getNextBlockOnDiskSize(ByteBuff onDiskBlock, int onDiskSizeWithHeader) { + return onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + + hdrSize; } private ByteBuff allocate(int size, boolean intoHeap) { @@ -1687,17 +1704,21 @@ private ByteBuff allocate(int size, boolean intoHeap) { protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, boolean intoHeap) throws IOException { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = Attributes.builder(); + Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) + .ifPresent(c -> c.accept(attributesBuilder)); if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); } + if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) { + LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", onDiskSizeWithHeaderL); + onDiskSizeWithHeaderL = -1; + } + int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; - final Span span = Span.current(); - final AttributesBuilder attributesBuilder = Attributes.builder(); - Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) - .ifPresent(c -> c.accept(attributesBuilder)); - int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); - // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 + // Try to use the cached header. Will serve us in rare case where onDiskSizeWithHeaderL==-1 // and will save us having to seek the stream backwards to reread the header we // read the last time through here. ByteBuff headerBuf = getCachedHeader(offset); @@ -1711,8 +1732,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, // file has support for checksums (version 2+). boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); long startTime = EnvironmentEdgeManager.currentTime(); - if (onDiskSizeWithHeader <= 0) { - // We were not passed the block size. Need to get it from the header. If header was + if (onDiskSizeWithHeader == -1) { + // The caller does not know the block size. Need to get it from the header. If header was // not cached (see getCachedHeader above), need to seek to pull it in. This is costly // and should happen very rarely. Currently happens on open of a hfile reader where we // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes @@ -1729,6 +1750,19 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, } onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); } + + // The common case is that onDiskSizeWithHeader was produced by a read without checksum + // validation, so give it a sanity check before trying to use it. + if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) { + if (verifyChecksum) { + invalidateNextBlockHeader(); + span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); + return null; + } else { + throw new IOException("Invalid onDiskSizeWithHeader=" + onDiskSizeWithHeader); + } + } + int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; // Allocate enough space to fit the next block's header too; saves a seek next time through. // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; @@ -1745,19 +1779,49 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, boolean readNextHeader = readAtOffset(is, onDiskBlock, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); onDiskBlock.rewind(); // in case of moving position when copying a cached header - int nextBlockOnDiskSize = - getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); + + // the call to validateChecksum for this block excludes the next block header over-read, so + // no reason to delay extracting this value. + int nextBlockOnDiskSize = -1; + if (readNextHeader) { + int parsedVal = getNextBlockOnDiskSize(onDiskBlock, onDiskSizeWithHeader); + if (checkOnDiskSizeWithHeader(parsedVal)) { + nextBlockOnDiskSize = parsedVal; + } + } if (headerBuf == null) { headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); } - // Do a few checks before we go instantiate HFileBlock. - assert onDiskSizeWithHeader > this.hdrSize; - verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); + ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); // Verify checksum of the data before using it for building HFileBlock. if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { + invalidateNextBlockHeader(); + span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); return null; } + + // TODO: is this check necessary or can we proceed with a provided value regardless of + // what is in the header? + int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); + if (onDiskSizeWithHeader != fromHeader) { + if (LOG.isTraceEnabled()) { + LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, fileContext={}", + onDiskSizeWithHeader, fromHeader, offset, this.fileContext); + } + if (checksumSupport && verifyChecksum) { + // This file supports HBase checksums and verification of those checksums was + // requested. The block size provided by the caller (presumably from the block index) + // does not match the block size written to the block header. treat this as + // HBase-checksum failure. + span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); + invalidateNextBlockHeader(); + return null; + } + throw new IOException("Passed in onDiskSizeWithHeader=" + onDiskSizeWithHeader + " != " + + fromHeader + ", offset=" + offset + ", fileContext=" + this.fileContext); + } + // remove checksum from buffer now that it's verified int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); curBlock.limit(sizeWithoutChecksum); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index baaefb0c0f89..053679952091 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -61,7 +61,7 @@ public class TestChecksum { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestChecksum.class); - private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); + private static final Logger LOG = LoggerFactory.getLogger(TestChecksum.class); static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 0d5190d16b35..189af113b334 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -163,12 +163,7 @@ public void testReaderWithoutBlockCache() throws Exception { fillByteBuffAllocator(alloc, bufCount); // start write to store file. Path path = writeStoreFile(); - try { - readStoreFile(path, conf, alloc); - } catch (Exception e) { - // fail test - assertTrue(false); - } + readStoreFile(path, conf, alloc); Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); alloc.clean(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java new file mode 100644 index 000000000000..f349adf92006 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test provides coverage for HFileHeader block fields that are read and interpreted before + * HBase checksum validation can be applied. As of now, this is just + * {@code onDiskSizeWithoutHeader}. + */ +@Category({ IOTests.class, SmallTests.class }) +public class TestHFileBlockHeaderCorruption { + + private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileBlockHeaderCorruption.class); + + private final HFileTestRule hFileTestRule; + + @Rule + public final RuleChain ruleChain; + + public TestHFileBlockHeaderCorruption() throws IOException { + TestName testName = new TestName(); + hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName); + ruleChain = RuleChain.outerRule(testName).around(hFileTestRule); + } + + @Test + public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws Exception { + HFileBlockChannelPosition firstBlock = null; + try { + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + assertTrue(it.hasNext()); + firstBlock = it.next(); + } + + Corrupter c = new Corrupter(firstBlock); + + logHeader(firstBlock); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); + logHeader(firstBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) + .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); + } + assertEquals(0, consumer.getItemsRead()); + } + + c.restore(); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(0))); + logHeader(firstBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); + } + assertEquals(0, consumer.getItemsRead()); + } + + c.restore(); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); + logHeader(firstBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) + .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); + } + assertEquals(0, consumer.getItemsRead()); + } + } finally { + if (firstBlock != null) { + firstBlock.close(); + } + } + } + + @Test + public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws Exception { + HFileBlockChannelPosition secondBlock = null; + try { + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + assertTrue(it.hasNext()); + it.next(); + assertTrue(it.hasNext()); + secondBlock = it.next(); + } + + Corrupter c = new Corrupter(secondBlock); + + logHeader(secondBlock); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); + logHeader(secondBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) + .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); + } + assertEquals(1, consumer.getItemsRead()); + } + + c.restore(); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(0))); + logHeader(secondBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); + } + assertEquals(1, consumer.getItemsRead()); + } + + c.restore(); + c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, + ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); + logHeader(secondBlock); + try (HFileBlockChannelPositionIterator it = + new HFileBlockChannelPositionIterator(hFileTestRule)) { + CountingConsumer consumer = new CountingConsumer(it); + try { + consumer.readFully(); + fail(); + } catch (Exception e) { + assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) + .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); + } + assertEquals(1, consumer.getItemsRead()); + } + } finally { + if (secondBlock != null) { + secondBlock.close(); + } + } + } + + private static void logHeader(HFileBlockChannelPosition hbcp) throws IOException { + ByteBuff buf = ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true))); + hbcp.rewind(); + assertEquals(buf.capacity(), buf.read(hbcp.getChannel())); + buf.rewind(); + hbcp.rewind(); + logHeader(buf); + } + + private static void logHeader(ByteBuff buf) { + byte[] blockMagic = new byte[8]; + buf.get(blockMagic); + int onDiskSizeWithoutHeader = buf.getInt(); + int uncompressedSizeWithoutHeader = buf.getInt(); + long prevBlockOffset = buf.getLong(); + byte checksumType = buf.get(); + int bytesPerChecksum = buf.getInt(); + int onDiskDataSizeWithHeader = buf.getInt(); + LOG.debug( + "blockMagic={}, onDiskSizeWithoutHeader={}, uncompressedSizeWithoutHeader={}, " + + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, onDiskDataSizeWithHeader={}", + Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, + prevBlockOffset, checksumType, bytesPerChecksum, onDiskDataSizeWithHeader); + } + + /** + * Data class to enabled messing with the bytes behind an {@link HFileBlock}. + */ + public static class HFileBlockChannelPosition implements Closeable { + private final SeekableByteChannel channel; + private final long position; + + public HFileBlockChannelPosition(SeekableByteChannel channel, long position) { + this.channel = channel; + this.position = position; + } + + public SeekableByteChannel getChannel() { + return channel; + } + + public long getPosition() { + return position; + } + + public void rewind() throws IOException { + channel.position(position); + } + + @Override + public void close() throws IOException { + channel.close(); + } + } + + /** + * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, counting them as it does. + */ + public static class CountingConsumer { + private final HFileBlockChannelPositionIterator iterator; + private int itemsRead = 0; + + public CountingConsumer(HFileBlockChannelPositionIterator iterator) { + this.iterator = iterator; + } + + public int getItemsRead() { + return itemsRead; + } + + public Object readFully() throws IOException { + Object val = null; + for (itemsRead = 0; iterator.hasNext(); itemsRead++) { + val = iterator.next(); + } + return val; + } + } + + /** + * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks a lot like an + * {@link java.util.Iterator}. + */ + public static class HFileBlockChannelPositionIterator implements Closeable { + + private final HFileTestRule hFileTestRule; + private final HFile.Reader reader; + private final HFileBlock.BlockIterator iter; + private HFileBlockChannelPosition current = null; + + public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) throws IOException { + Configuration conf = hFileTestRule.getConfiguration(); + HFileSystem hfs = hFileTestRule.getHFileSystem(); + Path hfsPath = hFileTestRule.getPath(); + + HFile.Reader reader = null; + HFileBlock.BlockIterator iter = null; + try { + reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, conf); + HFileBlock.FSReader fsreader = reader.getUncachedBlockReader(); + iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen()); + } catch (IOException e) { + if (reader != null) { + closeQuietly(reader::close); + } + throw e; + } + + this.hFileTestRule = hFileTestRule; + this.reader = reader; + this.iter = iter; + } + + public boolean hasNext() throws IOException { + HFileBlock next = iter.nextBlock(); + SeekableByteChannel channel = hFileTestRule.getRWChannel(); + if (next != null) { + current = new HFileBlockChannelPosition(channel, next.getOffset()); + } + return next != null; + } + + public HFileBlockChannelPosition next() { + if (current == null) { + throw new NoSuchElementException(); + } + HFileBlockChannelPosition ret = current; + current = null; + return ret; + } + + @Override + public void close() throws IOException { + if (current != null) { + closeQuietly(current::close); + } + closeQuietly(reader::close); + } + + @FunctionalInterface + private interface CloseMethod { + void run() throws IOException; + } + + private static void closeQuietly(CloseMethod closeMethod) { + try { + closeMethod.run(); + } catch (Throwable e) { + LOG.debug("Ignoring thrown exception.", e); + } + } + } + + /** + * Enables writing and rewriting portions of the file backing an {@link HFileBlock}. + */ + public static class Corrupter { + + private final HFileBlockChannelPosition channelAndPosition; + private final ByteBuffer originalHeader; + + public Corrupter(HFileBlockChannelPosition channelAndPosition) throws IOException { + this.channelAndPosition = channelAndPosition; + this.originalHeader = readHeaderData(channelAndPosition); + } + + private static ByteBuffer readHeaderData(HFileBlockChannelPosition channelAndPosition) + throws IOException { + SeekableByteChannel channel = channelAndPosition.getChannel(); + ByteBuffer originalHeader = ByteBuffer.allocate(HFileBlock.headerSize(true)); + channelAndPosition.rewind(); + channel.read(originalHeader); + return originalHeader; + } + + public void write(int offset, ByteBuffer src) throws IOException { + SeekableByteChannel channel = channelAndPosition.getChannel(); + long position = channelAndPosition.getPosition(); + channel.position(position + offset); + channel.write(src); + } + + public void restore() throws IOException { + SeekableByteChannel channel = channelAndPosition.getChannel(); + originalHeader.rewind(); + channelAndPosition.rewind(); + assertEquals(originalHeader.capacity(), channel.write(originalHeader)); + } + } + + public static class HFileTestRule extends ExternalResource { + + private final HBaseTestingUtility testingUtility; + private final HFileSystem hfs; + private final HFileContext context; + private final TestName testName; + private Path path; + + public HFileTestRule(HBaseTestingUtility testingUtility, TestName testName) throws IOException { + this.testingUtility = testingUtility; + this.testName = testName; + this.hfs = (HFileSystem) HFileSystem.get(testingUtility.getConfiguration()); + this.context = + new HFileContextBuilder().withBlockSize(4 * 1024).withHBaseCheckSum(true).build(); + } + + public Configuration getConfiguration() { + return testingUtility.getConfiguration(); + } + + public HFileSystem getHFileSystem() { + return hfs; + } + + public HFileContext getHFileContext() { + return context; + } + + public Path getPath() { + return path; + } + + public SeekableByteChannel getRWChannel() throws IOException { + java.nio.file.Path p = FileSystems.getDefault().getPath(path.toString()); + return Files.newByteChannel(p, StandardOpenOption.READ, StandardOpenOption.WRITE, + StandardOpenOption.DSYNC); + } + + @Override + protected void before() throws Throwable { + this.path = new Path(testingUtility.getDataTestDirOnTestFS(), testName.getMethodName()); + HFile.WriterFactory factory = + HFile.getWriterFactory(testingUtility.getConfiguration(), CacheConfig.DISABLED) + .withPath(hfs, path).withFileContext(context); + + CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + Random rand = new Random(Instant.now().toEpochMilli()); + byte[] family = Bytes.toBytes("f"); + try (HFile.Writer writer = factory.create()) { + for (int i = 0; i < 40; i++) { + byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 100); + byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); + byte[] value = RandomKeyValueUtil.randomValue(rand); + Cell cell = cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family) + .setQualifier(qualifier).setValue(value).build(); + writer.append(cell); + cellBuilder.clear(); + } + } + } + } + + /** + * A Matcher implementation that can make basic assertions over a provided {@link Throwable}. + * Assertion failures include the full stacktrace in their description. + */ + private static final class IsThrowableMatching extends TypeSafeMatcher { + + private final List> requirements = new LinkedList<>(); + + public IsThrowableMatching withInstanceOf(Class type) { + requirements.add(instanceOf(type)); + return this; + } + + public IsThrowableMatching withMessage(Matcher matcher) { + requirements.add(hasProperty("message", matcher)); + return this; + } + + @Override + protected boolean matchesSafely(Throwable throwable) { + return allOf(requirements).matches(throwable); + } + + @Override + protected void describeMismatchSafely(Throwable item, Description mismatchDescription) { + allOf(requirements).describeMismatch(item, mismatchDescription); + // would be nice if `item` could be provided as the cause of the AssertionError instead. + mismatchDescription.appendText(String.format("%nProvided: ")); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (PrintStream ps = new PrintStream(baos, false, StandardCharsets.UTF_8.name())) { + item.printStackTrace(ps); + ps.flush(); + } + mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void describeTo(Description description) { + description.appendDescriptionOf(allOf(requirements)); + } + } +}