Skip to content

Commit

Permalink
Revert "HBASE-28836 Parallize the file archival to improve the split …
Browse files Browse the repository at this point in the history
…times (apache#6483)"

This reverts commit 262c5bb.
  • Loading branch information
virajjasani authored and ragarkar committed Jan 13, 2025
1 parent c5d4c65 commit f69447b
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -99,7 +97,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
FSUtils.getRegionDirFromRootDir(rootDir, info));
}

Expand All @@ -115,8 +113,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
public static boolean archiveRegion(final Configuration conf, FileSystem fs, Path rootdir,
Path tableDir, Path regionDir) throws IOException {
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
Expand Down Expand Up @@ -159,8 +157,8 @@ public boolean accept(Path file) {
// convert the files in the region to a File
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
List<File> failedArchive =
resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" + regionDir.getName() + " into "
Expand Down Expand Up @@ -188,7 +186,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir : regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
archiveRegion(conf, fs, rootDir, tableDir, regionDir);
archiveRegion(fs, rootDir, tableDir, regionDir);
return null;
});
futures.add(future);
Expand Down Expand Up @@ -260,8 +258,8 @@ public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo p
* @param family the family hosting the store files
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration conf,
RegionInfo parent, Path familyDir, byte[] family) throws IOException {
public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent,
Path familyDir, byte[] family) throws IOException {
FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);
if (storeFiles == null) {
LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(),
Expand All @@ -275,7 +273,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration c

// do the actual archive
List<File> failedArchive =
resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:"
Expand All @@ -295,11 +293,10 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, final Configuration c
* attempted; otherwise likely to cause an {@link IOException}
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFiles(final Configuration conf, FileSystem fs,
RegionInfo regionInfo, Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
throws IOException {
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
}

/**
Expand Down Expand Up @@ -330,11 +327,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
"Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
archive(conf, fs, regionInfo, family, replayedEdits, path);
archive(fs, regionInfo, family, replayedEdits, path);
}

private static void archive(final Configuration conf, FileSystem fs, RegionInfo regionInfo,
byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn(
Expand Down Expand Up @@ -368,8 +365,8 @@ private static void archive(final Configuration conf, FileSystem fs, RegionInfo
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());

// do the actual archive
List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime());

if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
Expand Down Expand Up @@ -422,8 +419,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
private static List<File> resolveAndArchive(final Configuration conf, FileSystem fs,
Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException {
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
if (toArchive.isEmpty()) {
return Collections.emptyList();
Expand All @@ -440,54 +437,33 @@ private static List<File> resolveAndArchive(final Configuration conf, FileSystem
LOG.trace("Created archive directory {}", baseArchiveDir);
}

List<File> failures = Collections.synchronizedList(new ArrayList<>());
List<File> failures = new ArrayList<>();
String startTime = Long.toString(start);
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
if (!file.isFile()) {
// if its a directory and we need to archive all files
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir, children, start));
} else {
filesOnly.add(file);
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
}
Map<File, Future<Boolean>> futures = new HashMap<>();
// In current baseDir all files will be processed concurrently
for (File file : filesOnly) {
LOG.trace("Archiving {}", file);
Future<Boolean> archiveTask = getArchiveExecutor(conf)
.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futures.put(file, archiveTask);
}

for (Map.Entry<File, Future<Boolean>> fileFutureEntry : futures.entrySet()) {
try {
boolean fileCleaned = fileFutureEntry.getValue().get();
if (!fileCleaned) {
LOG.warn("Couldn't archive {} into backup directory: {}", fileFutureEntry.getKey(),
baseArchiveDir);
failures.add(fileFutureEntry.getKey());
}
} catch (InterruptedException e) {
LOG.warn("HFileArchive Cleanup thread was interrupted");
failures.add(fileFutureEntry.getKey());
} catch (ExecutionException e) {
// this is IOException
LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e);
failures.add(fileFutureEntry.getKey());
}
}
return failures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName);
Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
HFileArchiver.archiveRegion(env.getMasterConfiguration(), fs, mfs.getRootDir(), mobTableDir,
regionDir);
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
}

// Delete table directory from FS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi

// Archive region
Path rootDir = CommonFSUtils.getRootDir(conf);
HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);
HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);

// Delete empty region dir
if (!fs.delete(regionDir, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,7 @@ public void testCleaningRace() throws Exception {

try {
// Try to archive the file
HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(),
sourceRegionDir);
HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir);

// The archiver succeded, the file is no longer in the original location
// but it's in the archive location.
Expand Down Expand Up @@ -692,8 +691,7 @@ public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
FileSystem fileSystem = UTIL.getTestFileSystem();
// Try to archive the file but with null regionDir, can't delete sourceFile
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null));
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
}

@Test
Expand All @@ -702,15 +700,14 @@ public void testArchiveRegionWithTableDirNull() throws IOException {
CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc");
Path familyDir = new Path(regionDir, "rd");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "1");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
fileSystem.createNewFile(sourceFile);
Path sourceRegionDir = new Path(rootDir, regionDir);
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir));
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir));
assertFalse(fileSystem.exists(sourceRegionDir));
}

Expand All @@ -721,7 +718,6 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
"elgn4nf");
Path familyDir = new Path(regionDir, "rdar");
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path file = new Path(familyDir, "2");
Path sourceFile = new Path(rootDir, file);
FileSystem fileSystem = UTIL.getTestFileSystem();
Expand All @@ -730,7 +726,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(
HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null));
HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null));
assertTrue(fileSystem.exists(sourceRegionDir));
fileSystem.delete(sourceRegionDir, true);
}
Expand Down

0 comments on commit f69447b

Please sign in to comment.