Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26079 Use StoreFileTracker when splitting and merging #3617

Merged
merged 13 commits into from
Sep 8, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
Expand Down Expand Up @@ -53,6 +55,8 @@
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
Expand Down Expand Up @@ -584,40 +588,47 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();

List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
env.getMasterConfiguration(), fs, tableDir, mergedRegion);

for (RegionInfo ri: this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tableDir, ri, false);
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
}
assert mergeRegionFs != null;
mergeRegionFs.commitMergedRegion();
mergeRegionFs.commitMergedRegion(mergedFiles, env);

// Prepare to create merged regions
env.getAssignmentManager().getRegionStates().
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
}

private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
.get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
Configuration trackerConfig =
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
final Collection<StoreFileInfo> storeFiles = tracker.load();
if (storeFiles != null && storeFiles.size() > 0) {
for (StoreFileInfo storeFileInfo : storeFiles) {
// Create reference file(s) to parent region file here in mergedDir.
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
mergedFiles.add(refFile);
}
}
}
return mergedFiles;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -618,21 +620,20 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
final FileSystem fs = mfs.getFileSystem();
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);

regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);

Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);

assertReferenceFileCount(fs, expectedReferences.getFirst(),
assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
regionFs.commitDaughterRegion(daughterOneRI);
assertReferenceFileCount(fs, expectedReferences.getFirst(),
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));

assertReferenceFileCount(fs, expectedReferences.getSecond(),
assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI);
assertReferenceFileCount(fs, expectedReferences.getSecond(),
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
}

Expand All @@ -649,7 +650,7 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
* Create Split directory
* @param env MasterProcedureEnv
*/
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
Expand All @@ -665,7 +666,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
Configuration trackerConfig = StoreFileTrackerFactory.
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
Collection<StoreFileInfo> sfis = tracker.load();
if (sfis == null) {
continue;
}
Expand All @@ -691,7 +696,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
}
if (nbFiles == 0) {
// no file needs to be splitted.
return new Pair<Integer, Integer>(0, 0);
return new Pair<>(Collections.emptyList(), Collections.emptyList());
}
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(
Expand Down Expand Up @@ -744,14 +749,18 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

int daughterA = 0;
int daughterB = 0;
List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
daughterA += p.getFirst() != null ? 1 : 0;
daughterB += p.getSecond() != null ? 1 : 0;
if(p.getFirst() != null){
daughterA.add(p.getFirst());
}
if(p.getSecond() != null){
daughterB.add(p.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Expand All @@ -764,7 +773,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
" storefiles, Daughter B: " + daughterB + " storefiles.");
}
return new Pair<Integer, Integer>(daughterA, daughterB);
return new Pair<>(daughterA, daughterB);
}

private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -45,6 +47,9 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -592,19 +597,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
* @throws IOException
*/
public Path commitDaughterRegion(final RegionInfo regionInfo)
throws IOException {
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
MasterProcedureEnv env) throws IOException {
Path regionDir = this.getSplitsDir(regionInfo);
if (fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}

return regionDir;
}

private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
HRegionFileSystem regionFs) throws IOException {
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
get(regionInfo.getTable());
//we need to map trackers per store
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
//we need to map store files per store
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
for(Path file : allFiles) {
String familyName = file.getParent().getName();
trackerMap.computeIfAbsent(familyName, t -> {
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
return StoreFileTrackerFactory.
create(config, true, familyName, regionFs);
});
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
infos.add(new StoreFileInfo(conf, fs, file, true));
}
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
entry.getValue().add(fileInfoMap.get(entry.getKey()));
}
}

/**
* Creates region split daughter directories under the table dir. If the daughter regions already
* exist, for example, in the case of a recovery from a previous failed split procedure, this
Expand Down Expand Up @@ -755,13 +787,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi
* Commit a merged region, making it ready for use.
* @throws IOException
*/
public void commitMergedRegion() throws IOException {
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
throws IOException {
Path regionDir = getMergesDir(regionInfoForFs);
if (regionDir != null && fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -32,8 +33,7 @@
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {

public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need to touch this file?

super(conf, isPrimaryReplica, ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
*/
@InterfaceAudience.Private
public interface StoreFileTracker {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

/**
* Load the store files list when opening a region.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,51 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Factory method for creating store file tracker.
*/
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {

public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
HRegionFileSystem regionFs) {
ColumnFamilyDescriptorBuilder fDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
StoreContext ctx = StoreContext.getBuilder().
withColumnFamilyDescriptor(fDescBuilder.build()).
withRegionFileSystem(regionFs).
build();
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
}

public static Configuration mergeConfigurations(Configuration global,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better merge all the configurations? For example, if the tracker itself needs some configurations...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we have in HStore, for initializaing the Configuration.

    // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
    // CompoundConfiguration will look for keys in reverse order of addition, so we'd
    // add global config first, then table and cf overrides, then cf metadata.
    this.conf = new CompoundConfiguration()
      .add(confParam)
      .addBytesMap(region.getTableDescriptor().getValues())
      .addStringMap(family.getConfiguration())
      .addBytesMap(family.getValues());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract the above code into a util method and use it also for merging configurations in MergeTableRegionsProcedure and SplitTableRegionProcedure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed this suggestion before pushing up last commit. Let me change this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the implementation to use CompoundConfiguration accordingly. Sorry, ain't sure where you think this should be used. Wa are already calling StoreFileTrackerFactory.mergeConfigurations from MergeTableRegionsProcedure.mergeStoreFiles, SplitTableRegionProcedure.splitStoreFiles and HRegionFileSystem.insertRegionFilesIntoStoreTracker methods. I believe these are the only places we would require merging configs, as we need to obtain a StoreFileTracker instance from the factory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we could extract a util method, so we do not need to write the logic twice in both here and the constructor of HStore. Not a big deal, can do it later.

TableDescriptor table, ColumnFamilyDescriptor family) {
return new CompoundConfiguration()
.add(global)
.addBytesMap(table.getValues())
.addStringMap(family.getConfiguration())
.addBytesMap(family.getValues());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testCustomParts() throws Exception {
DummyStoreFlusher.class.getName());
HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
mockStore.conf = conf;
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
StoreEngine<?, ?, ?, ?> se =
Expand Down
Loading