Skip to content

Commit

Permalink
Create ref files using threadpool as part of merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Prathyusha Garre committed Oct 31, 2023
1 parent 4f3e63e commit 2c881f6
Showing 1 changed file with 68 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
Expand All @@ -53,14 +61,18 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -573,15 +585,57 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, mergedRegion);

Configuration conf = env.getMasterConfiguration();
int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
List<Path> mergedFiles = new ArrayList<Path>();
final ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Path>> futures = new ArrayList<Future<Path>>();
for (RegionInfo ri : this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir, ri, false);
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool, futures);
}
// Shutdown the pool
threadPool.shutdown();

// Wait for all the tasks to finish.
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(
"Took too long to merge the" + " files and create the references, aborting merge");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

for (Future<Path> future : futures) {
try {
Path path = future.get();
if (path != null) {
mergedFiles.add(path);
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e);
}
}
assert mergeRegionFs != null;
mergeRegionFs.commitMergedRegion(mergedFiles, env);

Expand All @@ -590,11 +644,12 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
.setState(State.MERGING_NEW);
}

private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion, ExecutorService threadPool,
List<Future<Path>> futures) throws IOException {
final TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
// List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -611,13 +666,17 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
mergedFiles.add(refFile);
futures.add(threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws Exception {
// TODO Auto-generated method stub
return mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
}
}));
}
}
}
return mergedFiles;
}

/**
Expand Down

0 comments on commit 2c881f6

Please sign in to comment.