Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28836 Parallize the file archival to improve the split times #6483

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -97,7 +99,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
FSUtils.getRegionDirFromRootDir(rootDir, info));
}

Expand All @@ -113,8 +115,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
public static boolean archiveRegion(final Configuration conf, FileSystem fs, Path rootdir,
Path tableDir, Path regionDir) throws IOException {
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
Expand Down Expand Up @@ -157,8 +159,8 @@ public boolean accept(Path file) {
// convert the files in the region to a File
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
List<File> failedArchive =
resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" + regionDir.getName() + " into "
Expand Down Expand Up @@ -186,7 +188,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir : regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
archiveRegion(fs, rootDir, tableDir, regionDir);
archiveRegion(conf, fs, rootDir, tableDir, regionDir);
return null;
});
futures.add(future);
Expand Down Expand Up @@ -258,8 +260,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p
* @param family the family hosting the store files
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent,
Path familyDir, byte[] family) throws IOException {
public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration conf,
RegionInfo parent, Path familyDir, byte[] family) throws IOException {
FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);
if (storeFiles == null) {
LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(),
Expand All @@ -273,7 +275,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:"
Expand All @@ -293,10 +295,11 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R
* attempted; otherwise likely to cause an {@link IOException}
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException {
public static void archiveStoreFiles(final Configuration conf, FileSystem fs,
RegionInfo regionInfo, Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
}

/**
Expand Down Expand Up @@ -327,11 +330,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
"Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
archive(fs, regionInfo, family, replayedEdits, path);
archive(conf, fs, regionInfo, family, replayedEdits, path);
}

private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
private static void archive(final Configuration conf, FileSystem fs, RegionInfo regionInfo,
byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn(
Expand Down Expand Up @@ -365,8 +368,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());

if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
Expand Down Expand Up @@ -419,8 +422,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
private static List<File> resolveAndArchive(final Configuration conf, FileSystem fs,
Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
if (toArchive.isEmpty()) {
return Collections.emptyList();
Expand All @@ -437,33 +440,54 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
LOG.trace("Created archive directory {}", baseArchiveDir);
}

List<File> failures = new ArrayList<>();
List<File> failures = Collections.synchronizedList(new ArrayList<>());
String startTime = Long.toString(start);
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
if (!file.isFile()) {
// if its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir, children, start));
} else {
filesOnly.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
}
Map<File, Future<Boolean>> futures = new HashMap<>();
// In current baseDir all files will be processed concurrently
for (File file : filesOnly) {
LOG.trace("Archiving {}", file);
Future<Boolean> archiveTask = getArchiveExecutor(conf)
.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futures.put(file, archiveTask);
}

for (Map.Entry<File, Future<Boolean>> fileFutureEntry : futures.entrySet()) {
try {
boolean fileCleaned = fileFutureEntry.getValue().get();
if (!fileCleaned) {
LOG.warn("Couldn't archive {} into backup directory: {}", fileFutureEntry.getKey(),
baseArchiveDir);
failures.add(fileFutureEntry.getKey());
}
} catch (InterruptedException e) {
LOG.warn("HFileArchive Cleanup thread was interrupted");
mnpoonia marked this conversation as resolved.
Show resolved Hide resolved
failures.add(fileFutureEntry.getKey());
} catch (ExecutionException e) {
// this is IOException
LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e);
failures.add(fileFutureEntry.getKey());
}
}
return failures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName);
Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
HFileArchiver.archiveRegion(env.getMasterConfiguration(), fs, mfs.getRootDir(), mobTableDir,
regionDir);
}

// Delete table directory from FS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi

// Archive region
Path rootDir = CommonFSUtils.getRootDir(conf);
HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);

// Delete empty region dir
if (!fs.delete(regionDir, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public void testCleaningRace() throws Exception {

try {
// Try to archive the file
HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(),
sourceRegionDir);

// The archiver succeded, the file is no longer in the original location
// but it's in the archive location.
Expand Down Expand Up @@ -691,7 +692,8 @@ public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
FileSystem fileSystem = UTIL.getTestFileSystem();
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null));
}

@Test
Expand All @@ -700,14 +702,15 @@ public void testArchiveRegionWithTableDirNull() throws IOException {
CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc");
Path familyDir = new Path(regionDir, "rd");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "1");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
fileSystem.createNewFile(sourceFile);
Path sourceRegionDir = new Path(rootDir, regionDir);
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir));
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir));
assertFalse(fileSystem.exists(sourceRegionDir));
}

Expand All @@ -718,6 +721,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
"elgn4nf");
Path familyDir = new Path(regionDir, "rdar");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "2");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
Expand All @@ -726,7 +730,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(
HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null));
HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null));
assertTrue(fileSystem.exists(sourceRegionDir));
fileSystem.delete(sourceRegionDir, true);
}
Expand Down