Skip to content

Commit

Permalink
HBASE-26079 Use StoreFileTracker when splitting and merging (apache#3617
Browse files Browse the repository at this point in the history
)

Signed-off-by: Duo Zhang <[email protected]>

Change-Id: I5e09f8cf9cc6511b47f68b9a7144831dc202be85
  • Loading branch information
wchevreuil authored and Josh Elser committed Dec 22, 2021
1 parent 9e78a4e commit 1f0088d
Show file tree
Hide file tree
Showing 12 changed files with 485 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
Expand Down Expand Up @@ -54,6 +56,8 @@
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
Expand Down Expand Up @@ -585,44 +589,51 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();

List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
env.getMasterConfiguration(), fs, tableDir, mergedRegion);

for (RegionInfo ri: this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tableDir, ri, false);
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
}
assert mergeRegionFs != null;
mergeRegionFs.commitMergedRegion();
mergeRegionFs.commitMergedRegion(mergedFiles, env);

// Prepare to create merged regions
env.getAssignmentManager().getRegionStates().
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
}

private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
.get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
Configuration trackerConfig =
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
final Collection<StoreFileInfo> storeFiles = tracker.load();
if (storeFiles != null && storeFiles.size() > 0) {
for (StoreFileInfo storeFileInfo : storeFiles) {
// Create reference file(s) to parent region file here in mergedDir.
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(mergeRegionFs.getFileSystem(),
storeFileInfo.getPath(),
env.getMasterConfiguration(),
CacheConfig.DISABLED,
hcd.getBloomFilterType(), true));
mergedFiles.add(refFile);
}
}
}
return mergedFiles;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -622,19 +624,19 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);

Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);

assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
//Move the files from the temporary .splits to the final /table/region directory
regionFs.commitDaughterRegion(daughterOneRI);
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));

assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI);
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
}

Expand All @@ -651,8 +653,9 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
* Create Split directory
* @param env MasterProcedureEnv
*/
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
// The following code sets up a thread pool executor with as many slots as
Expand All @@ -667,7 +670,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
Configuration trackerConfig = StoreFileTrackerFactory.
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
Collection<StoreFileInfo> sfis = tracker.load();
if (sfis == null) {
continue;
}
Expand All @@ -693,7 +700,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
}
if (nbFiles == 0) {
// no file needs to be splitted.
return new Pair<Integer, Integer>(0, 0);
return new Pair<>(Collections.emptyList(), Collections.emptyList());
}
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(
Expand All @@ -717,8 +724,9 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
StoreFileSplitter sfs =
new StoreFileSplitter(regionFs, familyName, new HStoreFile(
storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
new StoreFileSplitter(regionFs, familyName, new HStoreFile(mfs.getFileSystem(),
storeFileInfo.getPath(), conf, CacheConfig.DISABLED,
hcd.getBloomFilterType(), true));
futures.add(threadPool.submit(sfs));
}
}
Expand Down Expand Up @@ -746,14 +754,18 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

int daughterA = 0;
int daughterB = 0;
List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
daughterA += p.getFirst() != null ? 1 : 0;
daughterB += p.getSecond() != null ? 1 : 0;
if(p.getFirst() != null){
daughterA.add(p.getFirst());
}
if(p.getSecond() != null){
daughterB.add(p.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Expand All @@ -766,7 +778,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
" storefiles, Daughter B: " + daughterB + " storefiles.");
}
return new Pair<Integer, Integer>(daughterA, daughterB);
return new Pair<>(daughterA, daughterB);
}

private void assertSplitResultFilesCount(final FileSystem fs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -49,6 +51,9 @@
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -595,19 +600,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
* @throws IOException
*/
public Path commitDaughterRegion(final RegionInfo regionInfo)
throws IOException {
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
MasterProcedureEnv env) throws IOException {
Path regionDir = this.getSplitsDir(regionInfo);
if (fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}

return regionDir;
}

private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
HRegionFileSystem regionFs) throws IOException {
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
get(regionInfo.getTable());
//we need to map trackers per store
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
//we need to map store files per store
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
for(Path file : allFiles) {
String familyName = file.getParent().getName();
trackerMap.computeIfAbsent(familyName, t -> {
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
return StoreFileTrackerFactory.
create(config, true, familyName, regionFs);
});
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
infos.add(new StoreFileInfo(conf, fs, fs.getFileStatus(file)));
}
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
entry.getValue().add(fileInfoMap.get(entry.getKey()));
}
}

/**
* Creates region split daughter directories under the table dir. If the daughter regions already
* exist, for example, in the case of a recovery from a previous failed split procedure, this
Expand Down Expand Up @@ -796,13 +828,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi
* Commit a merged region, making it ready for use.
* @throws IOException
*/
public void commitMergedRegion() throws IOException {
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
throws IOException {
Path regionDir = getMergesDir(regionInfoForFs);
if (regionDir != null && fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -32,8 +33,7 @@
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {

public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
*/
@InterfaceAudience.Private
public interface StoreFileTracker {

/**
* Load the store files list when opening a region.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,51 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Factory method for creating store file tracker.
*/
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {

public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
HRegionFileSystem regionFs) {
ColumnFamilyDescriptorBuilder fDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
StoreContext ctx = StoreContext.getBuilder().
withColumnFamilyDescriptor(fDescBuilder.build()).
withRegionFileSystem(regionFs).
build();
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
}

public static Configuration mergeConfigurations(Configuration global,
TableDescriptor table, ColumnFamilyDescriptor family) {
return new CompoundConfiguration()
.add(global)
.addBytesMap(table.getValues())
.addStringMap(family.getConfiguration())
.addBytesMap(family.getValues());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testCustomParts() throws Exception {
DummyStoreFlusher.class.getName());
HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
mockStore.conf = conf;
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
StoreEngine<?, ?, ?, ?> se =
Expand Down
Loading

0 comments on commit 1f0088d

Please sign in to comment.