diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java index ffb3647e6259..ced01187b69b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.storefiletracker; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.zip.CRC32; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -29,9 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; /** @@ -42,18 +41,27 @@ * other file. *

* So in this way, we could avoid listing when we want to load the store file list file. + *

+ * To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4 + * bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return + * without error on partial bytes if you stop at some special points, but the return message will + * have incorrect field value. We should try our best to prevent this happens because loading an + * incorrect store file list file usually leads to data loss. */ @InterfaceAudience.Private class StoreFileListFile { private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class); - private static final String TRACK_FILE_DIR = ".filelist"; + static final String TRACK_FILE_DIR = ".filelist"; private static final String TRACK_FILE = "f1"; private static final String TRACK_FILE_ROTATE = "f2"; + // 16 MB, which is big enough for a tracker file + private static final int MAX_FILE_SIZE = 16 * 1024 * 1024; + private final StoreContext ctx; private final Path trackFileDir; @@ -74,16 +82,26 @@ class StoreFileListFile { private StoreFileList load(Path path) throws IOException { FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); - byte[] bytes; + byte[] data; + int expectedChecksum; try (FSDataInputStream in = fs.open(path)) { - bytes = ByteStreams.toByteArray(in); + int length = in.readInt(); + if (length <= 0 || length > MAX_FILE_SIZE) { + throw new IOException("Invalid file length " + length + + ", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE); + } + data = new byte[length]; + in.readFully(data); + expectedChecksum = in.readInt(); } - // Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException - // here. This is very important for upper layer to determine whether this is the normal case, - // where the file does not exist or is incomplete. If there is another type of exception, the - // upper layer should throw it out instead of just ignoring it, otherwise it will lead to data - // loss. - return StoreFileList.parseFrom(bytes); + CRC32 crc32 = new CRC32(); + crc32.update(data); + int calculatedChecksum = (int) crc32.getValue(); + if (expectedChecksum != calculatedChecksum) { + throw new IOException( + "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum); + } + return StoreFileList.parseFrom(data); } private int select(StoreFileList[] lists) { @@ -101,9 +119,9 @@ StoreFileList load() throws IOException { for (int i = 0; i < 2; i++) { try { lists[i] = load(trackFiles[i]); - } catch (FileNotFoundException | InvalidProtocolBufferException e) { + } catch (FileNotFoundException | EOFException e) { // this is normal case, so use info and do not log stacktrace - LOG.info("Failed to load track file {}: {}", trackFiles[i], e); + LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString()); } } int winnerIndex = select(lists); @@ -124,10 +142,17 @@ void update(StoreFileList.Builder builder) throws IOException { // we need to call load first to load the prevTimestamp and also the next file load(); } - FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); + byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray(); + CRC32 crc32 = new CRC32(); + crc32.update(actualData); + int checksum = (int) crc32.getValue(); + // 4 bytes length at the beginning, plus 4 bytes checksum + FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) { - builder.setTimestamp(timestamp).build().writeTo(out); + out.writeInt(actualData.length); + out.write(actualData); + out.writeInt(checksum); } // record timestamp prevTimestamp = timestamp; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java index 51ea9f58248a..bb9985e53143 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; -import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -96,7 +96,7 @@ public void testCreateWithTrackImpl() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName()); ProcedureExecutor procExec = getMasterProcedureExecutor(); TableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, F1); - String trackerName = TestStoreFileTracker.class.getName(); + String trackerName = StoreFileTrackerForTest.class.getName(); htd = TableDescriptorBuilder.newBuilder(htd).setValue(TRACKER_IMPL, trackerName).build(); RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null); long procId = ProcedureTestingUtility.submitAndWait(procExec, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java index 2cbfdea94ef7..703d6193e5e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -86,13 +86,13 @@ public static void afterClass() throws Exception { @Before public void setup(){ - TestStoreFileTracker.clear(); + StoreFileTrackerForTest.clear(); } private TableName createTable(byte[] splitKey) throws IOException { TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME)) - .setValue(TRACKER_IMPL, TestStoreFileTracker.class.getName()).build(); + .setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build(); if (splitKey != null) { TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey }); } else { @@ -247,7 +247,8 @@ private void validateDaughterRegionsFiles(HRegion region, String orignalFileName private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception { for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) { - assertTrue(TestStoreFileTracker.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath())); + assertTrue( + StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java similarity index 91% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java index c89e151b40c6..abef80acb9d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java @@ -32,14 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestStoreFileTracker extends DefaultStoreFileTracker { +public class StoreFileTrackerForTest extends DefaultStoreFileTracker { - private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class); + private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerForTest.class); private static ConcurrentMap> trackedFiles = new ConcurrentHashMap<>(); private String storeId; - public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { + public StoreFileTrackerForTest(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { super(conf, isPrimaryReplica, ctx); if (ctx != null && ctx.getRegionFileSystem() != null) { this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java new file mode 100644 index 000000000000..2aba24b4a46f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestStoreFileListFile { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStoreFileListFile.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileListFile.class); + + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private Path testDir; + + private StoreFileListFile storeFileListFile; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws IOException { + testDir = UTIL.getDataTestDir(name.getMethodName()); + HRegionFileSystem hfs = mock(HRegionFileSystem.class); + when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration())); + StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir) + .withRegionFileSystem(hfs).build(); + storeFileListFile = new StoreFileListFile(ctx); + } + + @AfterClass + public static void tearDown() { + UTIL.cleanupTestDir(); + } + + @Test + public void testEmptyLoad() throws IOException { + assertNull(storeFileListFile.load()); + } + + private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException { + return fs.listStatus(new Path(testDir, StoreFileListFile.TRACK_FILE_DIR))[0]; + } + + private byte[] readAll(FileSystem fs, Path file) throws IOException { + try (FSDataInputStream in = fs.open(file)) { + return ByteStreams.toByteArray(in); + } + } + + private void write(FileSystem fs, Path file, byte[] buf, int off, int len) throws IOException { + try (FSDataOutputStream out = fs.create(file, true)) { + out.write(buf, off, len); + } + } + + @Test + public void testLoadPartial() throws IOException { + StoreFileList.Builder builder = StoreFileList.newBuilder(); + storeFileListFile.update(builder); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + FileStatus trackerFileStatus = getOnlyTrackerFile(fs); + // truncate it so we do not have enough data + LOG.info("Truncate file {} with size {} to {}", trackerFileStatus.getPath(), + trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2); + byte[] content = readAll(fs, trackerFileStatus.getPath()); + write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2); + assertNull(storeFileListFile.load()); + } + + private void writeInt(byte[] buf, int off, int value) { + byte[] b = Bytes.toBytes(value); + for (int i = 0; i < 4; i++) { + buf[off + i] = b[i]; + } + } + + @Test + public void testZeroFileLength() throws IOException { + StoreFileList.Builder builder = StoreFileList.newBuilder(); + storeFileListFile.update(builder); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + FileStatus trackerFileStatus = getOnlyTrackerFile(fs); + // write a zero length + byte[] content = readAll(fs, trackerFileStatus.getPath()); + writeInt(content, 0, 0); + write(fs, trackerFileStatus.getPath(), content, 0, content.length); + assertThrows(IOException.class, () -> storeFileListFile.load()); + } + + @Test + public void testBigFileLength() throws IOException { + StoreFileList.Builder builder = StoreFileList.newBuilder(); + storeFileListFile.update(builder); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + FileStatus trackerFileStatus = getOnlyTrackerFile(fs); + // write a large length + byte[] content = readAll(fs, trackerFileStatus.getPath()); + writeInt(content, 0, 128 * 1024 * 1024); + write(fs, trackerFileStatus.getPath(), content, 0, content.length); + assertThrows(IOException.class, () -> storeFileListFile.load()); + } + + @Test + public void testChecksumMismatch() throws IOException { + StoreFileList.Builder builder = StoreFileList.newBuilder(); + storeFileListFile.update(builder); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + FileStatus trackerFileStatus = getOnlyTrackerFile(fs); + // flip one byte + byte[] content = readAll(fs, trackerFileStatus.getPath()); + content[5] = (byte) ~content[5]; + write(fs, trackerFileStatus.getPath(), content, 0, content.length); + assertThrows(IOException.class, () -> storeFileListFile.load()); + } +}