From b60fecd18da58c40368e9051be749c42d107be5f Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 15 Feb 2023 12:00:08 +0800 Subject: [PATCH] HBASE-27637 Zero length value would cause value compressor read nothing and not advance the position of the InputStream (#5025) Signed-off-by: Xiaolin Ha Signed-off-by: Andrew Purtell (cherry picked from commit 2bbe036e2999f886008bb83988c541eb879e2431) --- .../regionserver/wal/CompressionContext.java | 18 ++++--- .../hbase/regionserver/wal/WALCellCodec.java | 6 +-- .../hbase/wal/CompressedWALTestBase.java | 52 ++++++++++++------- .../TestCompressedWALValueCompression.java | 3 +- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 0ac0ce0d4fb8..73cf4821db00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; @@ -105,7 +106,7 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) thro return compressed; } - public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, + public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, int outLength) throws IOException { // Our input is a sequence of bounded byte ranges (call them segments), with @@ -122,11 +123,16 @@ public int decompress(InputStream in, int inLength, byte[] outArray, int outOffs } else { lowerIn.setDelegate(in, inLength); } - - // Caller must handle short reads. - // With current Hadoop compression codecs all 'outLength' bytes are read in here, so not - // an issue for now. - return compressedIn.read(outArray, outOffset, outLength); + if (outLength == 0) { + // The BufferedInputStream will return earlier and skip reading anything if outLength == 0, + // but in fact for an empty value, the compressed output still contains some metadata so the + // compressed size is not 0, so here we need to manually skip inLength bytes otherwise the + // next read on this stream will start from an invalid position and cause critical problem, + // such as data loss when splitting wal or replicating wal. + IOUtils.skipFully(in, inLength); + } else { + IOUtils.readFully(compressedIn, outArray, outOffset, outLength); + } } public void clear() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 816ce3ed45ad..84709cbc58dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -382,13 +382,9 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { int compressedLen = StreamUtils.readRawVarint32(in); - int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, + compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, expectedLength); - if (read != expectedLength) { - throw new IOException("ValueCompressor state error: short read"); - } } - } public static class EnsureKvEncoder extends BaseEncoder { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java index 2f577bfa78ed..b5f7dc634c75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.wal; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -26,8 +28,9 @@ import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -82,33 +85,42 @@ public void doTest(TableName tableName) throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); - kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(row).setFamily(family).setQualifier(Bytes.toBytes(i)).setValue(value).build()); + kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(Cell.Type.DeleteFamily).setRow(row).setFamily(family).build()); wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs); + wal.sync(); } - wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); wals.shutdown(); // Confirm the WAL can be read back - WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); - int count = 0; - WAL.Entry entry = new WAL.Entry(); - while (reader.next(entry) != null) { - count++; - List cells = entry.getEdit().getCells(); - assertTrue("Should be one KV per WALEdit", cells.size() == 1); - for (Cell cell : cells) { - assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(), - cell.getRowLength(), row, 0, row.length)); - assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength(), family, 0, family.length)); - assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(), - cell.getValueLength(), value, 0, value.length)); + try (WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath)) { + int count = 0; + WAL.Entry entry = new WAL.Entry(); + while (reader.next(entry) != null) { + count++; + List cells = entry.getEdit().getCells(); + assertThat("Should be two KVs per WALEdit", cells, hasSize(2)); + Cell putCell = cells.get(0); + assertEquals(Cell.Type.Put, putCell.getType()); + assertTrue("Incorrect row", Bytes.equals(putCell.getRowArray(), putCell.getRowOffset(), + putCell.getRowLength(), row, 0, row.length)); + assertTrue("Incorrect family", Bytes.equals(putCell.getFamilyArray(), + putCell.getFamilyOffset(), putCell.getFamilyLength(), family, 0, family.length)); + assertTrue("Incorrect value", Bytes.equals(putCell.getValueArray(), + putCell.getValueOffset(), putCell.getValueLength(), value, 0, value.length)); + + Cell deleteCell = cells.get(1); + assertEquals(Cell.Type.DeleteFamily, deleteCell.getType()); + assertTrue("Incorrect row", Bytes.equals(deleteCell.getRowArray(), + deleteCell.getRowOffset(), deleteCell.getRowLength(), row, 0, row.length)); + assertTrue("Incorrect family", Bytes.equals(deleteCell.getFamilyArray(), + deleteCell.getFamilyOffset(), deleteCell.getFamilyLength(), family, 0, family.length)); } + assertEquals("Should have read back as many KVs as written", total, count); } - assertEquals("Should have read back as many KVs as written", total, count); - reader.close(); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java index 2c35b0accc6a..88c0c889ade4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java @@ -45,7 +45,7 @@ public class TestCompressedWALValueCompression extends CompressedWALTestBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCompressedWALValueCompression.class); - @Parameters + @Parameters(name = "{index}: compression={0}") public static List params() { return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED; } @@ -78,5 +78,4 @@ public void test() throws Exception { TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_")); doTest(tableName); } - }