Skip to content

Commit

Permalink
HBASE-26640 Reimplement master local region initialization to better …
Browse files Browse the repository at this point in the history
…work with SFT (#4111)

Signed-off-by: Josh Elser <[email protected]>
Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
Apache9 committed Feb 24, 2022
1 parent e1131a2 commit 4cdb380
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ private void tryMigrateMetaLocationsFromZooKeeper() throws IOException, KeeperEx
// done with in a one row put, which means if we have data in catalog family then we can
// make sure that the migration is done.
LOG.info("The {} family in master local region already has data in it, skip migrating...",
HConstants.CATALOG_FAMILY);
HConstants.CATALOG_FAMILY_STR);
return;
}
}
Expand Down Expand Up @@ -4072,7 +4072,7 @@ public MetaLocationSyncer getMetaLocationSyncer() {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
MasterRegion getMasterRegion() {
public MasterRegion getMasterRegion() {
return masterRegion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,34 @@
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
Expand Down Expand Up @@ -92,6 +99,10 @@ public final class MasterRegion {

private static final String DEAD_WAL_DIR_SUFFIX = "-dead";

static final String INITIALIZING_FLAG = ".initializing";

static final String INITIALIZED_FLAG = ".initialized";

private static final int REGION_ID = 1;

private final WALFactory walFactory;
Expand Down Expand Up @@ -196,32 +207,39 @@ private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRol

private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
MasterRegionWALRoller walRoller, String serverName, boolean touchInitializingFlag)
throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
// persist table descriptor
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, true);
HRegion.createHRegion(conf, regionInfo, fs, tableDir, td).close();
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag: " + initializedFlag);
}
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
if (!fs.delete(initializingFlag, true)) {
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}

private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
FileSystem walFs, Path walRootDir, WALFactory walFactory, MasterRegionWALRoller walRoller,
String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
.getPath();
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
private static RegionInfo loadRegionInfo(FileSystem fs, Path tableDir) throws IOException {
// on branch-2, the RegionInfo.isEncodedRegionName will returns true for .initializing and
// .initialized, see HBASE-25368. Since RegionInfo is IA.Public, changing the implementation may
// raise compatibility concerns, so here we just skip them by our own.
Path regionDir = fs.listStatus(tableDir, p -> !p.getName().startsWith(".")
&& RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0].getPath();
return HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
}

private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo regionInfo,
FileSystem fs, Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
Expand Down Expand Up @@ -287,6 +305,39 @@ private static void replayWALs(Configuration conf, FileSystem walFs, Path walRoo
}
}

private static void tryMigrate(Configuration conf, FileSystem fs, Path tableDir,
RegionInfo regionInfo, TableDescriptor oldTd, TableDescriptor newTd) throws IOException {
Class<? extends StoreFileTracker> oldSft =
StoreFileTrackerFactory.getTrackerClass(oldTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
Class<? extends StoreFileTracker> newSft =
StoreFileTrackerFactory.getTrackerClass(newTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
if (oldSft.equals(newSft)) {
LOG.debug("old store file tracker {} is the same with new store file tracker, skip migration",
StoreFileTrackerFactory.getStoreFileTrackerName(oldSft));
if (!oldTd.equals(newTd)) {
// we may change other things such as adding a new family, so here we still need to persist
// the new table descriptor
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
}
return;
}
LOG.info("Migrate store file tracker from {} to {}", oldSft.getSimpleName(),
newSft.getSimpleName());
HRegionFileSystem hfs =
HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false);
for (ColumnFamilyDescriptor oldCfd : oldTd.getColumnFamilies()) {
StoreFileTracker oldTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
StoreFileTracker newTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
List<StoreFileInfo> files = oldTracker.load();
LOG.debug("Store file list for {}: {}", oldCfd.getNameAsString(), files);
newTracker.set(oldTracker.load());
}
// persist the new table descriptor after migration
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
}

public static MasterRegion create(MasterRegionParams params) throws IOException {
TableDescriptor td = params.tableDescriptor();
LOG.info("Create or load local region for table " + td);
Expand Down Expand Up @@ -321,16 +372,58 @@ public static MasterRegion create(MasterRegionParams params) throws IOException

WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
HRegion region;
if (fs.exists(tableDir)) {
// load the existing region.
region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
} else {
// bootstrapping...
if (!fs.exists(tableDir)) {
// bootstrap, no doubt
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag");
}
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
server.getServerName().toString(), true);
} else {
if (!fs.exists(initializedFlag)) {
if (!fs.exists(initializingFlag)) {
// should be old style, where we do not have the initializing or initialized file, persist
// the table descriptor, touch the initialized flag and then open the region.
// the store file tracker must be DEFAULT
LOG.info("No {} or {} file, try upgrading", INITIALIZING_FLAG, INITIALIZED_FLAG);
TableDescriptor oldTd =
TableDescriptorBuilder.newBuilder(td).setValue(StoreFileTrackerFactory.TRACKER_IMPL,
StoreFileTrackerFactory.Trackers.DEFAULT.name()).build();
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, oldTd, true);
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag: " + initializedFlag);
}
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
} else {
// delete all contents besides the initializing flag, here we can make sure tableDir
// exists(unless someone delete it manually...), so we do not do null check here.
for (FileStatus status : fs.listStatus(tableDir)) {
if (!status.getPath().getName().equals(INITIALIZING_FLAG)) {
fs.delete(status.getPath(), true);
}
}
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString(), false);
}
} else {
if (fs.exists(initializingFlag) && !fs.delete(initializingFlag, true)) {
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
}
// open it, make sure to load the table descriptor from fs
TableDescriptor oldTd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
}
}

Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -78,6 +81,8 @@ public final class MasterRegionFactory {

private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;

public static final String TRACKER_IMPL = "hbase.master.store.region.file-tracker.impl";

public static final TableName TABLE_NAME = TableName.valueOf("master:store");

public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
Expand All @@ -89,10 +94,23 @@ public final class MasterRegionFactory {
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();

private static TableDescriptor withTrackerConfigs(Configuration conf) {
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,
StoreFileTrackerFactory.Trackers.DEFAULT.name()));
Class<? extends StoreFileTracker> trackerClass =
StoreFileTrackerFactory.getTrackerClass(trackerImpl);
if (StoreFileTrackerFactory.isMigration(trackerClass)) {
throw new IllegalArgumentException("Should not set store file tracker to " +
StoreFileTrackerFactory.Trackers.MIGRATION.name() + " for master local region");
}
StoreFileTracker tracker = ReflectionUtils.newInstance(trackerClass, conf, true, null);
return tracker.updateWithTrackerConfigs(TableDescriptorBuilder.newBuilder(TABLE_DESC)).build();
}

public static MasterRegion create(Server server) throws IOException {
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
Configuration conf = server.getConfiguration();
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(withTrackerConfigs(conf));
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static String getStoreFileTrackerName(Configuration conf) {
return conf.get(TRACKER_IMPL, Trackers.DEFAULT.name());
}

static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
public static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
Trackers name = CLASS_TO_ENUM.get(clazz);
return name != null ? name.name() : clazz.getName();
}
Expand Down Expand Up @@ -184,4 +184,8 @@ public static TableDescriptor updateWithTrackerConfigs(Configuration conf,
}
return descriptor;
}

public static boolean isMigration(Class<?> clazz) {
return MigrationStoreFileTracker.class.isAssignableFrom(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFro
return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
}

/**
* Deletes files matching the table info file pattern within the given directory whose sequenceId
* is at most the given max sequenceId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -85,19 +86,24 @@ public void setUp() throws IOException {
/**
* Creates a new MasterRegion using an existing {@code htu} on this class.
*/
protected void createMasterRegion() throws IOException {
configure(htu.getConfiguration());
protected final void createMasterRegion() throws IOException {
Configuration conf = htu.getConfiguration();
configure(conf);
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
when(server.getChoreService()).thenReturn(choreService);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
CommonFSUtils.setRootDir(conf, testDir);
MasterRegionParams params = new MasterRegionParams();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
TableDescriptor td = TableDescriptorBuilder
.newBuilder(TD).setValue(StoreFileTrackerFactory.TRACKER_IMPL, conf
.get(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()))
.build();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(td)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
Expand Down
Loading

0 comments on commit 4cdb380

Please sign in to comment.