Skip to content

Commit

Permalink
HBASE-28836 Parallize the file archival to improve the split times (a…
Browse files Browse the repository at this point in the history
…pache#6483)

Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: David Manning <[email protected]>
Signed-off-by: Umesh Kumar <[email protected]>
  • Loading branch information
mnpoonia authored and virajjasani committed Dec 9, 2024
1 parent ff40fcd commit 8f6cff2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -97,7 +99,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
FSUtils.getRegionDirFromRootDir(rootDir, info));
}

Expand All @@ -113,8 +115,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
public static boolean archiveRegion(final Configuration conf, FileSystem fs, Path rootdir,
Path tableDir, Path regionDir) throws IOException {
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
Expand Down Expand Up @@ -157,8 +159,8 @@ public boolean accept(Path file) {
// convert the files in the region to a File
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
List<File> failedArchive =
resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" + regionDir.getName() + " into "
Expand Down Expand Up @@ -186,7 +188,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir : regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
archiveRegion(fs, rootDir, tableDir, regionDir);
archiveRegion(conf, fs, rootDir, tableDir, regionDir);
return null;
});
futures.add(future);
Expand Down Expand Up @@ -258,8 +260,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p
* @param family the family hosting the store files
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent,
Path familyDir, byte[] family) throws IOException {
public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration conf,
RegionInfo parent, Path familyDir, byte[] family) throws IOException {
FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);
if (storeFiles == null) {
LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(),
Expand All @@ -273,7 +275,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:"
Expand All @@ -293,10 +295,11 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R
* attempted; otherwise likely to cause an {@link IOException}
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException {
public static void archiveStoreFiles(final Configuration conf, FileSystem fs,
RegionInfo regionInfo, Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
}

/**
Expand Down Expand Up @@ -327,11 +330,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
"Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
archive(fs, regionInfo, family, replayedEdits, path);
archive(conf, fs, regionInfo, family, replayedEdits, path);
}

private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
private static void archive(final Configuration conf, FileSystem fs, RegionInfo regionInfo,
byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn(
Expand Down Expand Up @@ -365,8 +368,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());

if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
Expand Down Expand Up @@ -419,8 +422,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
private static List<File> resolveAndArchive(final Configuration conf, FileSystem fs,
Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
if (toArchive.isEmpty()) {
return Collections.emptyList();
Expand All @@ -437,33 +440,54 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
LOG.trace("Created archive directory {}", baseArchiveDir);
}

List<File> failures = new ArrayList<>();
List<File> failures = Collections.synchronizedList(new ArrayList<>());
String startTime = Long.toString(start);
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
if (!file.isFile()) {
// if its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir, children, start));
} else {
filesOnly.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
}
Map<File, Future<Boolean>> futures = new HashMap<>();
// In current baseDir all files will be processed concurrently
for (File file : filesOnly) {
LOG.trace("Archiving {}", file);
Future<Boolean> archiveTask = getArchiveExecutor(conf)
.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futures.put(file, archiveTask);
}

for (Map.Entry<File, Future<Boolean>> fileFutureEntry : futures.entrySet()) {
try {
boolean fileCleaned = fileFutureEntry.getValue().get();
if (!fileCleaned) {
LOG.warn("Couldn't archive {} into backup directory: {}", fileFutureEntry.getKey(),
baseArchiveDir);
failures.add(fileFutureEntry.getKey());
}
} catch (InterruptedException e) {
LOG.warn("HFileArchive Cleanup thread was interrupted");
failures.add(fileFutureEntry.getKey());
} catch (ExecutionException e) {
// this is IOException
LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e);
failures.add(fileFutureEntry.getKey());
}
}
return failures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName);
Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
HFileArchiver.archiveRegion(env.getMasterConfiguration(), fs, mfs.getRootDir(), mobTableDir,
regionDir);
}

// Delete table directory from FS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi

// Archive region
Path rootDir = CommonFSUtils.getRootDir(conf);
HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);

// Delete empty region dir
if (!fs.delete(regionDir, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ public void testCleaningRace() throws Exception {

try {
// Try to archive the file
HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(),
sourceRegionDir);

// The archiver succeded, the file is no longer in the original location
// but it's in the archive location.
Expand Down Expand Up @@ -683,7 +684,8 @@ public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
FileSystem fileSystem = UTIL.getTestFileSystem();
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null));
}

@Test
Expand All @@ -692,14 +694,15 @@ public void testArchiveRegionWithTableDirNull() throws IOException {
CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc");
Path familyDir = new Path(regionDir, "rd");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "1");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
fileSystem.createNewFile(sourceFile);
Path sourceRegionDir = new Path(rootDir, regionDir);
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir));
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir));
assertFalse(fileSystem.exists(sourceRegionDir));
}

Expand All @@ -710,6 +713,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
"elgn4nf");
Path familyDir = new Path(regionDir, "rdar");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "2");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
Expand All @@ -718,7 +722,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(
HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null));
HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null));
assertTrue(fileSystem.exists(sourceRegionDir));
fileSystem.delete(sourceRegionDir, true);
}
Expand Down

0 comments on commit 8f6cff2

Please sign in to comment.