diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java index b1e298dbbe22..99fd3d43572e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java @@ -38,13 +38,6 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto super(conf, isPrimaryReplica, ctx); } - @Override - public List load() throws IOException { - List files = - ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString()); - return files != null ? files : Collections.emptyList(); - } - @Override public boolean requireWritingToTmpDirFirst() { return true; @@ -62,7 +55,13 @@ protected void doAddCompactionResults(Collection compactedFiles, } @Override - public void set(List files) { - // NOOP + protected List doLoadStoreFiles(boolean readOnly) throws IOException { + List files = + ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString()); + return files != null ? files : Collections.emptyList(); + } + + @Override + protected void doSetStoreFiles(Collection files) throws IOException { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java index 8d9b66e53d2a..91e1bdc7dc67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java @@ -67,8 +67,8 @@ public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S } @Override - public List load() throws IOException { - StoreFileList list = backedFile.load(); + protected List doLoadStoreFiles(boolean readOnly) throws IOException { + StoreFileList list = backedFile.load(readOnly); if (list == null) { return Collections.emptyList(); } @@ -148,7 +148,7 @@ protected void doAddCompactionResults(Collection compactedFiles, } @Override - public void set(List files) throws IOException { + protected void doSetStoreFiles(Collection files) throws IOException { synchronized (storefiles) { storefiles.clear(); StoreFileList.Builder builder = StoreFileList.newBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java index 53a474d3bde7..f483d3386729 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -49,13 +49,6 @@ public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S "src and dst is the same: %s", src.getClass()); } - @Override - public List load() throws IOException { - List files = src.load(); - dst.set(files); - return files; - } - @Override public boolean requireWritingToTmpDirFirst() { // Returns true if either of the two StoreFileTracker returns true. @@ -67,6 +60,15 @@ public boolean requireWritingToTmpDirFirst() { return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst(); } + @Override + protected List doLoadStoreFiles(boolean readOnly) throws IOException { + List files = src.doLoadStoreFiles(readOnly); + if (!readOnly) { + dst.doSetStoreFiles(files); + } + return files; + } + @Override protected void doAddNewStoreFiles(Collection newFiles) throws IOException { src.doAddNewStoreFiles(newFiles); @@ -81,7 +83,7 @@ protected void doAddCompactionResults(Collection compactedFiles, } @Override - public void set(List files) { + protected void doSetStoreFiles(Collection files) throws IOException { throw new UnsupportedOperationException( "Should not call this method on " + getClass().getSimpleName()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java index ced01187b69b..5ed35c7beae1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java @@ -20,17 +20,29 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ForkJoinPool; +import java.util.regex.Pattern; import java.util.zip.CRC32; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.regionserver.StoreContext; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; + import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; /** @@ -55,9 +67,13 @@ class StoreFileListFile { static final String TRACK_FILE_DIR = ".filelist"; - private static final String TRACK_FILE = "f1"; + private static final String TRACK_FILE_PREFIX = "f1"; + + private static final String TRACK_FILE_ROTATE_PREFIX = "f2"; - private static final String TRACK_FILE_ROTATE = "f2"; + private static final char TRACK_FILE_SEPARATOR = '.'; + + private static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)\\.\\d+$"); // 16 MB, which is big enough for a tracker file private static final int MAX_FILE_SIZE = 16 * 1024 * 1024; @@ -76,8 +92,6 @@ class StoreFileListFile { StoreFileListFile(StoreContext ctx) { this.ctx = ctx; trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR); - trackFiles[0] = new Path(trackFileDir, TRACK_FILE); - trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE); } private StoreFileList load(Path path) throws IOException { @@ -114,23 +128,103 @@ private int select(StoreFileList[] lists) { return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1; } - StoreFileList load() throws IOException { + // file sequence id to path + private NavigableMap> listFiles() throws IOException { + FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); + FileStatus[] statuses; + try { + statuses = fs.listStatus(trackFileDir); + } catch (FileNotFoundException e) { + LOG.debug("Track file directory {} does not exist", trackFileDir, e); + return Collections.emptyNavigableMap(); + } + if (statuses == null || statuses.length == 0) { + return Collections.emptyNavigableMap(); + } + TreeMap> map = new TreeMap<>((l1, l2) -> l2.compareTo(l1)); + for (FileStatus status : statuses) { + Path file = status.getPath(); + if (!status.isFile()) { + LOG.warn("Found invalid track file {}, which is not a file", file); + continue; + } + if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) { + LOG.warn("Found invalid track file {}, skip", file); + continue; + } + List parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName()); + map.computeIfAbsent(Long.parseLong(parts.get(1)), k -> new ArrayList<>()).add(file); + } + return map; + } + + private void initializeTrackFiles(long seqId) { + trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId); + trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId); + LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]); + } + + private void cleanUpTrackFiles(long loadedSeqId, + NavigableMap> seqId2TrackFiles) { + LOG.info("Cleanup track file with sequence id < {}", loadedSeqId); + FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); + NavigableMap> toDelete = + loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles; + toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> { + ForkJoinPool.commonPool().execute(() -> { + LOG.info("Deleting track file {}", file); + try { + fs.delete(file, false); + } catch (IOException e) { + LOG.warn("failed to delete unused track file {}", file, e); + } + }); + }); + } + + StoreFileList load(boolean readOnly) throws IOException { + NavigableMap> seqId2TrackFiles = listFiles(); + long seqId = -1L; StoreFileList[] lists = new StoreFileList[2]; - for (int i = 0; i < 2; i++) { - try { - lists[i] = load(trackFiles[i]); - } catch (FileNotFoundException | EOFException e) { - // this is normal case, so use info and do not log stacktrace - LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString()); + for (Map.Entry> entry : seqId2TrackFiles.entrySet()) { + List files = entry.getValue(); + // should not have more than 2 files, if not, it means that the track files are broken, just + // throw exception out and fail the region open. + if (files.size() > 2) { + throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id " + + entry.getKey() + ", but got " + files.size() + " files: " + files); + } + boolean loaded = false; + for (int i = 0; i < files.size(); i++) { + try { + lists[i] = load(files.get(i)); + loaded = true; + } catch (EOFException e) { + // this is normal case, so use info and do not log stacktrace + LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString()); + } + } + if (loaded) { + seqId = entry.getKey(); + break; } } - int winnerIndex = select(lists); - if (lists[winnerIndex] != null) { - nextTrackFile = 1 - winnerIndex; - prevTimestamp = lists[winnerIndex].getTimestamp(); - } else { + if (readOnly) { + return lists[select(lists)]; + } + + cleanUpTrackFiles(seqId, seqId2TrackFiles); + + if (seqId < 0) { + initializeTrackFiles(System.currentTimeMillis()); nextTrackFile = 0; + return null; } + + initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1)); + int winnerIndex = select(lists); + nextTrackFile = 1 - winnerIndex; + prevTimestamp = lists[winnerIndex].getTimestamp(); return lists[winnerIndex]; } @@ -140,7 +234,8 @@ StoreFileList load() throws IOException { void update(StoreFileList.Builder builder) throws IOException { if (nextTrackFile < 0) { // we need to call load first to load the prevTimestamp and also the next file - load(); + // we are already in the update method, which is not read only, so pass false + load(false); } long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray(); @@ -162,7 +257,7 @@ void update(StoreFileList.Builder builder) throws IOException { fs.delete(trackFiles[nextTrackFile], false); } catch (IOException e) { // we will create new file with overwrite = true, so not a big deal here, only for speed up - // loading as we do not need to read this file when loading(we will hit FileNotFoundException) + // loading as we do not need to read this file when loading LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index f45655230f83..8b8c7699e7b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.Collection; - +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -66,6 +66,11 @@ protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, Sto this.ctx = ctx; } + @Override + public final List load() throws IOException { + return doLoadStoreFiles(!isPrimaryReplica); + } + @Override public final void add(Collection newFiles) throws IOException { if (isPrimaryReplica) { @@ -81,6 +86,13 @@ public final void replace(Collection compactedFiles, } } + @Override + public final void set(List files) throws IOException { + if (isPrimaryReplica) { + doSetStoreFiles(files); + } + } + @Override public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { builder.setValue(TRACKER_IMPL, getTrackerName()); @@ -173,8 +185,19 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th return builder.build(); } + /** + * For primary replica, we will call load once when opening a region, and the implementation could + * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you + * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to + * {@code true}. + */ + protected abstract List doLoadStoreFiles(boolean readOnly) throws IOException; + protected abstract void doAddNewStoreFiles(Collection newFiles) throws IOException; protected abstract void doAddCompactionResults(Collection compactedFiles, Collection newFiles) throws IOException; + + protected abstract void doSetStoreFiles(Collection files) throws IOException; + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java index abef80acb9d5..4a90beeb5249 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java @@ -57,7 +57,7 @@ protected void doAddNewStoreFiles(Collection newFiles) throws IOE } @Override - public List load() throws IOException { + protected List doLoadStoreFiles(boolean readOnly) throws IOException { return new ArrayList<>(trackedFiles.get(storeId)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java index 2aba24b4a46f..dbb17c5b4fce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.storefiletracker; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,6 +49,7 @@ import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; +import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; @Category({ RegionServerTests.class, SmallTests.class }) @@ -67,14 +70,18 @@ public class TestStoreFileListFile { @Rule public TestName name = new TestName(); - @Before - public void setUp() throws IOException { - testDir = UTIL.getDataTestDir(name.getMethodName()); + private StoreFileListFile create() throws IOException { HRegionFileSystem hfs = mock(HRegionFileSystem.class); when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration())); StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir) .withRegionFileSystem(hfs).build(); - storeFileListFile = new StoreFileListFile(ctx); + return new StoreFileListFile(ctx); + } + + @Before + public void setUp() throws IOException { + testDir = UTIL.getDataTestDir(name.getMethodName()); + storeFileListFile = create(); } @AfterClass @@ -84,7 +91,7 @@ public static void tearDown() { @Test public void testEmptyLoad() throws IOException { - assertNull(storeFileListFile.load()); + assertNull(storeFileListFile.load(false)); } private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException { @@ -114,7 +121,7 @@ public void testLoadPartial() throws IOException { trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2); byte[] content = readAll(fs, trackerFileStatus.getPath()); write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2); - assertNull(storeFileListFile.load()); + assertNull(storeFileListFile.load(false)); } private void writeInt(byte[] buf, int off, int value) { @@ -134,7 +141,7 @@ public void testZeroFileLength() throws IOException { byte[] content = readAll(fs, trackerFileStatus.getPath()); writeInt(content, 0, 0); write(fs, trackerFileStatus.getPath(), content, 0, content.length); - assertThrows(IOException.class, () -> storeFileListFile.load()); + assertThrows(IOException.class, () -> storeFileListFile.load(false)); } @Test @@ -147,7 +154,7 @@ public void testBigFileLength() throws IOException { byte[] content = readAll(fs, trackerFileStatus.getPath()); writeInt(content, 0, 128 * 1024 * 1024); write(fs, trackerFileStatus.getPath(), content, 0, content.length); - assertThrows(IOException.class, () -> storeFileListFile.load()); + assertThrows(IOException.class, () -> storeFileListFile.load(false)); } @Test @@ -160,6 +167,59 @@ public void testChecksumMismatch() throws IOException { byte[] content = readAll(fs, trackerFileStatus.getPath()); content[5] = (byte) ~content[5]; write(fs, trackerFileStatus.getPath(), content, 0, content.length); - assertThrows(IOException.class, () -> storeFileListFile.load()); + assertThrows(IOException.class, () -> storeFileListFile.load(false)); + } + + @Test + public void testLoadNewerTrackFiles() throws IOException, InterruptedException { + StoreFileList.Builder builder = StoreFileList.newBuilder(); + storeFileListFile.update(builder); + + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + FileStatus trackFileStatus = getOnlyTrackerFile(fs); + + builder.addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build()); + storeFileListFile = create(); + storeFileListFile.update(builder); + + // should load the list we stored the second time + storeFileListFile = create(); + StoreFileList list = storeFileListFile.load(true); + assertEquals(1, list.getStoreFileCount()); + // since read only is true, we should not delete the old track file + // the deletion is in background, so we will test it multiple times through HTU.waitFor and make + // sure that it is still there after timeout, i.e, the waitFor method returns -1 + assertTrue(UTIL.waitFor(2000, 100, false, () -> !fs.exists(testDir)) < 0); + + // this time read only is false, we should delete the old track file + list = storeFileListFile.load(false); + assertEquals(1, list.getStoreFileCount()); + UTIL.waitFor(5000, () -> !fs.exists(trackFileStatus.getPath())); + } + + // This is to simulate the scenario where a 'dead' RS perform flush or compaction on a region + // which has already been reassigned to another RS. This is possible in real world, usually caused + // by a long STW GC. + @Test + public void testConcurrentUpdate() throws IOException { + storeFileListFile.update(StoreFileList.newBuilder()); + + StoreFileListFile storeFileListFile2 = create(); + storeFileListFile2.update(StoreFileList.newBuilder() + .addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build())); + + // let's update storeFileListFile several times + for (int i = 0; i < 10; i++) { + storeFileListFile.update(StoreFileList.newBuilder() + .addStoreFile(StoreFileEntry.newBuilder().setName("haha-" + i).setSize(100 + i).build())); + } + + // create a new list file, make sure we load the list generate by storeFileListFile2. + StoreFileListFile storeFileListFile3 = create(); + StoreFileList fileList = storeFileListFile3.load(true); + assertEquals(1, fileList.getStoreFileCount()); + StoreFileEntry entry = fileList.getStoreFile(0); + assertEquals("hehe", entry.getName()); + assertEquals(10, entry.getSize()); } }