From 833b10e8bab7c28457caa854c0f714b489f88fa3 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 11 Feb 2023 19:34:17 +0800 Subject: [PATCH] HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016) Signed-off-by: Xiaolin Ha --- .../hbase/io/TagCompressionContext.java | 4 +- .../hadoop/hbase/io/util/StreamUtils.java | 17 ++ .../hbase/regionserver/wal/Compressor.java | 4 +- .../regionserver/wal/ProtobufLogReader.java | 16 ++ .../hbase/regionserver/wal/ReaderBase.java | 13 +- .../hbase/regionserver/wal/WALCellCodec.java | 12 +- .../TestWALEntryStreamCompressionReset.java | 251 ++++++++++++++++++ 7 files changed, 308 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 74b0f2db108c..f938fdaab35b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -107,7 +107,7 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length) throws IOException { int endOffset = offset + length; while (offset < endOffset) { - byte status = (byte) src.read(); + byte status = StreamUtils.readByte(src); if (status == Dictionary.NOT_IN_DICTIONARY) { int tagLen = StreamUtils.readRawVarint32(src); offset = Bytes.putAsShort(dest, offset, tagLen); @@ -115,7 +115,7 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length) tagDict.addEntry(dest, offset, tagLen); offset += tagLen; } else { - short dictIdx = StreamUtils.toShort(status, (byte) src.read()); + short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src)); byte[] entry = tagDict.getEntry(dictIdx); if (entry == null) { throw new IOException("Missing dictionary entry for index " + dictIdx); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 97e1e9d3345a..0bda535d6b13 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.util; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -206,6 +207,22 @@ public static Pair readRawVarint32(ByteBuffer input, int offse return new Pair<>(result, newOffset - offset); } + /** + * Read a byte from the given stream using the read method, and throw EOFException if it returns + * -1, like the implementation in {@code DataInputStream}. + *

+ * This is useful because casting the return value of read method into byte directly will make us + * lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as + * casting int -1 to byte also returns -1. + */ + public static byte readByte(InputStream in) throws IOException { + int r = in.read(); + if (r < 0) { + throw new EOFException(); + } + return (byte) r; + } + public static short toShort(byte hi, byte lo) { short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); Preconditions.checkArgument(s >= 0); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index d283a19e45fd..bed31530d878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -108,7 +108,9 @@ static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException { // if this isn't in the dictionary, we need to add to the dictionary. byte[] arr = new byte[length]; in.readFully(arr); - if (dict != null) dict.addEntry(arr, 0, length); + if (dict != null) { + dict.addEntry(arr, 0, length); + } return arr; } else { // Status here is the higher-order byte of index of the dictionary entry diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 12f0efc57282..2e3fb4f5b5f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -98,6 +98,9 @@ public class ProtobufLogReader extends ReaderBase { // cell codec classname private String codecClsName = null; + // a flag indicate that whether we need to reset compression context when seeking back + private boolean resetCompression; + @InterfaceAudience.Private public long trailerSize() { if (trailerPresent) { @@ -160,6 +163,9 @@ public long getPosition() throws IOException { @Override public void reset() throws IOException { String clsName = initInternal(null, false); + if (resetCompression) { + resetCompression(); + } initAfterCompression(clsName); // We need a new decoder (at least). } @@ -361,6 +367,8 @@ protected boolean readNext(Entry entry) throws IOException { WALKey.Builder builder = WALKey.newBuilder(); long size = 0; boolean resetPosition = false; + // by default, we should reset the compression when seeking back after reading something + resetCompression = true; try { long available = -1; try { @@ -372,6 +380,14 @@ protected boolean readNext(Entry entry) throws IOException { // available may be < 0 on local fs for instance. If so, can't depend on it. available = this.inputStream.available(); if (available > 0 && available < size) { + // if we quit here, we have just read the length, no actual data yet, which means we + // haven't put anything into the compression dictionary yet, so when seeking back to the + // last good position, we do not need to reset compression context. + // This is very useful for saving the extra effort for reconstructing the compression + // dictionary, where we need to read from the beginning instead of just seek to the + // position, as DFSInputStream implement the available method, so in most cases we will + // reach here if there are not enough data. + resetCompression = false; throw new EOFException("Available stream not enough for edit, " + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " + size + " at offset = " + this.inputStream.getPos()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 5e14c475ae33..5caceeac09b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -45,7 +45,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { * Compression context to use reading. Can be null if no compression. */ protected CompressionContext compressionContext = null; - protected boolean emptyCompressionContext = true; + private boolean emptyCompressionContext = true; /** * Default constructor. @@ -130,6 +130,17 @@ public void seek(long pos) throws IOException { seekOnFs(pos); } + /** + * Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext} + * to true, so when seeking, we will try to skip to the position and reconstruct the dictionary. + */ + protected final void resetCompression() { + if (compressionContext != null) { + compressionContext.clear(); + emptyCompressionContext = true; + } + } + /** * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of * the stream if not null and may use it. Called once. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 5b60b10e128b..816ce3ed45ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -197,18 +197,20 @@ public byte[] uncompress(ByteString data, Enum dictIndex) { private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { InputStream in = bs.newInput(); - byte status = (byte) in.read(); + byte status = StreamUtils.readByte(in); if (status == Dictionary.NOT_IN_DICTIONARY) { byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; int bytesRead = in.read(arr); if (bytesRead != arr.length) { throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); } - if (dict != null) dict.addEntry(arr, 0, arr.length); + if (dict != null) { + dict.addEntry(arr, 0, arr.length); + } return arr; } else { // Status here is the higher-order byte of index of the dictionary entry. - short dictIdx = StreamUtils.toShort(status, (byte) in.read()); + short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); byte[] entry = dict.getEntry(dictIdx); if (entry == null) { throw new IOException("Missing dictionary entry for index " + dictIdx); @@ -350,7 +352,7 @@ protected Cell parseCell() throws IOException { } private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { - byte status = (byte) in.read(); + byte status = StreamUtils.readByte(in); if (status == Dictionary.NOT_IN_DICTIONARY) { // status byte indicating that data to be read is not in dictionary. // if this isn't in the dictionary, we need to add to the dictionary. @@ -360,7 +362,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep return length; } else { // the status byte also acts as the higher order byte of the dictionary entry. - short dictIdx = StreamUtils.toShort(status, (byte) in.read()); + short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); byte[] entry = dict.getEntry(dictIdx); if (entry == null) { throw new IOException("Missing dictionary entry for index " + dictIdx); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java new file mode 100644 index 000000000000..9fc97bba3019 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; + +/** + * Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader. + *

+ * This is used to confirm that we can work well when hitting EOFException in the middle when + * reading a WAL entry, when compression is enabled. See HBASE-27621 for more details. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestWALEntryStreamCompressionReset { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEntryStreamCompressionReset.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static TableName TABLE_NAME = TableName.valueOf("reset"); + + private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); + + private static byte[] FAMILY = Bytes.toBytes("family"); + + private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); + + private static NavigableMap SCOPE; + + private static String GROUP_ID = "group"; + + private static FileSystem FS; + + private static ReplicationSource SOURCE; + + private static MetricsSource METRICS_SOURCE; + + private static ReplicationSourceLogQueue LOG_QUEUE; + + private static Path TEMPLATE_WAL_FILE; + + private static int END_OFFSET_OF_WAL_ENTRIES; + + private static Path WAL_FILE; + + private static volatile long WAL_LENGTH; + + private static ReplicationSourceWALReader READER; + + // return the wal path, and also the end offset of last wal entry + private static Pair generateWAL() throws Exception { + Path path = UTIL.getDataTestDir("wal"); + ProtobufLogWriter writer = new ProtobufLogWriter(); + writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null); + for (int i = 0; i < Byte.MAX_VALUE; i++) { + WALEdit edit = new WALEdit(); + edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i)) + .setValue(Bytes.toBytes("v-" + i)).build()); + writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME, + EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit)); + } + + WALEdit edit2 = new WALEdit(); + edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier")) + .setValue(Bytes.toBytes("vv")).build()); + edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1")) + .setValue(Bytes.toBytes("vvv")).build()); + writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME, + EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2)); + writer.sync(false); + long offset = writer.getSyncedLength(); + writer.close(); + return Pair.newPair(path, offset); + } + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + FS = UTIL.getTestFileSystem(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); + conf.setInt("replication.source.maxretriesmultiplier", 1); + FS.mkdirs(UTIL.getDataTestDir()); + Pair pair = generateWAL(); + TEMPLATE_WAL_FILE = pair.getFirst(); + END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue(); + WAL_FILE = UTIL.getDataTestDir("rep_source"); + + METRICS_SOURCE = new MetricsSource("reset"); + SOURCE = mock(ReplicationSource.class); + when(SOURCE.isPeerEnabled()).thenReturn(true); + when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH)); + when(SOURCE.getServerWALsBelongTo()) + .thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())); + when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE); + ReplicationSourceManager rsm = mock(ReplicationSourceManager.class); + when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong()); + when(rsm.getTotalBufferLimit()) + .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); + when(SOURCE.getSourceManager()).thenReturn(rsm); + + LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE); + LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID); + READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID); + } + + @AfterClass + public static void tearDown() throws Exception { + READER.setReaderRunning(false); + READER.join(); + UTIL.cleanupTestDir(); + } + + private void test(byte[] content, FSDataOutputStream out) throws Exception { + // minus 15 so the second entry is incomplete + // 15 is a magic number here, we want the reader parse the first cell but not the second cell, + // especially not the qualifier of the second cell. The value of the second cell is 'vvv', which + // is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been + // compressed), so 15 is a proper value, of course 14 or 16 could also work here. + out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15); + out.hflush(); + WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15; + READER.start(); + List entries = new ArrayList<>(); + for (;;) { + WALEntryBatch batch = READER.poll(1000); + if (batch == null) { + break; + } + entries.addAll(batch.getWalEntries()); + } + // should return all the entries except the last one + assertEquals(Byte.MAX_VALUE, entries.size()); + for (int i = 0; i < Byte.MAX_VALUE; i++) { + WAL.Entry entry = entries.get(i); + assertEquals(1, entry.getEdit().size()); + Cell cell = entry.getEdit().getCells().get(0); + assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset())); + assertEquals(Bytes.toString(FAMILY), + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())); + assertEquals("v-" + i, + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); + } + + // confirm that we can not get the last one since it is incomplete + assertNull(READER.poll(1000)); + // write the last byte out + out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15); + out.hflush(); + WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES; + + // should get the last entry + WALEntryBatch batch = READER.poll(10000); + assertEquals(1, batch.getNbEntries()); + WAL.Entry entry = batch.getWalEntries().get(0); + assertEquals(2, entry.getEdit().size()); + Cell cell2 = entry.getEdit().getCells().get(0); + assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset())); + assertEquals(Bytes.toString(FAMILY), + Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength())); + assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(), + cell2.getQualifierLength())); + assertEquals("vv", + Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength())); + + Cell cell3 = entry.getEdit().getCells().get(1); + assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset())); + assertEquals(Bytes.toString(FAMILY), + Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength())); + assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(), + cell3.getQualifierOffset(), cell3.getQualifierLength())); + assertEquals("vvv", + Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength())); + } + + @Test + public void testReset() throws Exception { + byte[] content; + try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) { + content = ByteStreams.toByteArray(in); + } + try (FSDataOutputStream out = FS.create(WAL_FILE)) { + test(content, out); + } + } +}