From 7a74e6ee14ebfa68f33732a65d6a521af03893a0 Mon Sep 17 00:00:00 2001 From: Aman Poonia Date: Tue, 10 Dec 2024 02:13:10 +0530 Subject: [PATCH] HBASE-28836 Parallize the file archival to improve the split times (#6483) Signed-off-by: Viraj Jasani Signed-off-by: David Manning Signed-off-by: Umesh Kumar <9414umeshkumar@gmail.com> --- .../hadoop/hbase/backup/HFileArchiver.java | 86 ++++++++++++------- .../procedure/DeleteTableProcedure.java | 3 +- .../hbase/regionserver/HRegionFileSystem.java | 2 +- .../hbase/backup/TestHFileArchiving.java | 12 ++- 4 files changed, 66 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index b2ea9cd33a0b..389ad66d45c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -23,7 +23,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; @@ -97,7 +99,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info) public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); - archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()), + archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()), FSUtils.getRegionDirFromRootDir(rootDir, info)); } @@ -113,8 +115,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i * operations could not complete. * @throws IOException if the request cannot be completed */ - public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir) - throws IOException { + public static boolean archiveRegion(final Configuration conf, FileSystem fs, Path rootdir, + Path tableDir, Path regionDir) throws IOException { // otherwise, we archive the files // make sure we can archive if (tableDir == null || regionDir == null) { @@ -157,8 +159,8 @@ public boolean accept(Path file) { // convert the files in the region to a File Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add); LOG.debug("Archiving " + toArchive); - List failedArchive = - resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + List failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive, + EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( "Failed to archive/delete all the files for region:" + regionDir.getName() + " into " @@ -186,7 +188,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi List> futures = new ArrayList<>(regionDirList.size()); for (Path regionDir : regionDirList) { Future future = getArchiveExecutor(conf).submit(() -> { - archiveRegion(fs, rootDir, tableDir, regionDir); + archiveRegion(conf, fs, rootDir, tableDir, regionDir); return null; }); futures.add(future); @@ -258,8 +260,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p * @param family the family hosting the store files * @throws IOException if the files could not be correctly disposed. */ - public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent, - Path familyDir, byte[] family) throws IOException { + public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration conf, + RegionInfo parent, Path familyDir, byte[] family) throws IOException { FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir); if (storeFiles == null) { LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(), @@ -273,7 +275,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R // do the actual archive List failedArchive = - resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( "Failed to archive/delete all the files for region:" @@ -293,10 +295,11 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R * attempted; otherwise likely to cause an {@link IOException} * @throws IOException if the files could not be correctly disposed. */ - public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo, - Path tableDir, byte[] family, Collection compactedFiles) throws IOException { + public static void archiveStoreFiles(final Configuration conf, FileSystem fs, + RegionInfo regionInfo, Path tableDir, byte[] family, Collection compactedFiles) + throws IOException { Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); - archive(fs, regionInfo, family, compactedFiles, storeArchiveDir); + archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir); } /** @@ -327,11 +330,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi "Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme()); } path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family); - archive(fs, regionInfo, family, replayedEdits, path); + archive(conf, fs, regionInfo, family, replayedEdits, path); } - private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family, - Collection compactedFiles, Path storeArchiveDir) throws IOException { + private static void archive(final Configuration conf, FileSystem fs, RegionInfo regionInfo, + byte[] family, Collection compactedFiles, Path storeArchiveDir) throws IOException { // sometimes in testing, we don't have rss, so we need to check for that if (fs == null) { LOG.warn( @@ -365,8 +368,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family, compactedFiles.stream().map(getStorePath).collect(Collectors.toList()); // do the actual archive - List failedArchive = - resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime()); + List failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles, + EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( @@ -419,8 +422,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf * @return the list of failed to archive files. * @throws IOException if an unexpected file operation exception occurred */ - private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, - Collection toArchive, long start) throws IOException { + private static List resolveAndArchive(final Configuration conf, FileSystem fs, + Path baseArchiveDir, Collection toArchive, long start) throws IOException { // short circuit if no files to move if (toArchive.isEmpty()) { return Collections.emptyList(); @@ -437,33 +440,54 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, LOG.trace("Created archive directory {}", baseArchiveDir); } - List failures = new ArrayList<>(); + List failures = Collections.synchronizedList(new ArrayList<>()); String startTime = Long.toString(start); + List filesOnly = new ArrayList<>(); for (File file : toArchive) { - // if its a file archive it try { - LOG.trace("Archiving {}", file); - if (file.isFile()) { - // attempt to archive the file - if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { - LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); - failures.add(file); - } - } else { - // otherwise its a directory and we need to archive all files + if (!file.isFile()) { + // if its a directory and we need to archive all files LOG.trace("{} is a directory, archiving children files", file); // so we add the directory name to the one base archive Path parentArchiveDir = new Path(baseArchiveDir, file.getName()); // and then get all the files from that directory and attempt to // archive those too Collection children = file.getChildren(); - failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start)); + failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir, children, start)); + } else { + filesOnly.add(file); } } catch (IOException e) { LOG.warn("Failed to archive {}", file, e); failures.add(file); } } + Map> futures = new HashMap<>(); + // In current baseDir all files will be processed concurrently + for (File file : filesOnly) { + LOG.trace("Archiving {}", file); + Future archiveTask = getArchiveExecutor(conf) + .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + futures.put(file, archiveTask); + } + + for (Map.Entry> fileFutureEntry : futures.entrySet()) { + try { + boolean fileCleaned = fileFutureEntry.getValue().get(); + if (!fileCleaned) { + LOG.warn("Couldn't archive {} into backup directory: {}", fileFutureEntry.getKey(), + baseArchiveDir); + failures.add(fileFutureEntry.getKey()); + } + } catch (InterruptedException e) { + LOG.warn("HFileArchive Cleanup thread was interrupted"); + failures.add(fileFutureEntry.getKey()); + } catch (ExecutionException e) { + // this is IOException + LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e); + failures.add(fileFutureEntry.getKey()); + } + } return failures; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 8c2f1067c952..836f47af9346 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -319,7 +319,8 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName); Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); if (fs.exists(regionDir)) { - HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); + HFileArchiver.archiveRegion(env.getMasterConfiguration(), fs, mfs.getRootDir(), mobTableDir, + regionDir); } // Delete table directory from FS diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 6f1ba4f6b406..3ee6db42fc98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -1002,7 +1002,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi // Archive region Path rootDir = CommonFSUtils.getRootDir(conf); - HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir); + HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir); // Delete empty region dir if (!fs.delete(regionDir, true)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index e087b8e723e2..9ef5bf388c63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -659,7 +659,8 @@ public void testCleaningRace() throws Exception { try { // Try to archive the file - HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir); + HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(), + sourceRegionDir); // The archiver succeded, the file is no longer in the original location // but it's in the archive location. @@ -691,7 +692,8 @@ public void testArchiveRegionTableAndRegionDirsNull() throws IOException { Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); FileSystem fileSystem = UTIL.getTestFileSystem(); // Try to archive the file but with null regionDir, can't delete sourceFile - assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null)); + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); + assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null)); } @Test @@ -700,6 +702,7 @@ public void testArchiveRegionWithTableDirNull() throws IOException { CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc"); Path familyDir = new Path(regionDir, "rd"); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path file = new Path(familyDir, "1"); Path sourceFile = new Path(rootDir, file); FileSystem fileSystem = UTIL.getTestFileSystem(); @@ -707,7 +710,7 @@ public void testArchiveRegionWithTableDirNull() throws IOException { Path sourceRegionDir = new Path(rootDir, regionDir); fileSystem.mkdirs(sourceRegionDir); // Try to archive the file - assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir)); + assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir)); assertFalse(fileSystem.exists(sourceRegionDir)); } @@ -718,6 +721,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException { "elgn4nf"); Path familyDir = new Path(regionDir, "rdar"); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path file = new Path(familyDir, "2"); Path sourceFile = new Path(rootDir, file); FileSystem fileSystem = UTIL.getTestFileSystem(); @@ -726,7 +730,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException { fileSystem.mkdirs(sourceRegionDir); // Try to archive the file but with null regionDir, can't delete sourceFile assertFalse( - HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null)); + HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null)); assertTrue(fileSystem.exists(sourceRegionDir)); fileSystem.delete(sourceRegionDir, true); }