Skip to content

Commit

Permalink
HBASE-21688 Address WAL filesystem issues
Browse files Browse the repository at this point in the history
Amending-Author: Josh Elser <[email protected]>
Signed-off-by: Josh Elser <[email protected]>
  • Loading branch information
Vladimir Rodionov authored and joshelser committed Apr 3, 2019
1 parent a0c7232 commit ef9ed98
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public static FileSystem getRootDirFileSystem(final Configuration c) throws IOEx
* @throws IOException e
*/
public static Path getWALRootDir(final Configuration c) throws IOException {

Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
if (!isValidWALRootDir(p, c)) {
return getRootDir(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
Expand Down Expand Up @@ -944,10 +945,11 @@ public int run(Path inputDir, int numMappers) throws Exception {
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -506,10 +507,9 @@ private int doSearch(Configuration conf, String keysDir) throws Exception {
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;

/**
* WALLink describes a link to a WAL.
Expand All @@ -45,7 +45,7 @@ public class WALLink extends FileLink {
*/
public WALLink(final Configuration conf,
final String serverName, final String logName) throws IOException {
this(FSUtils.getWALRootDir(conf), serverName, logName);
this(CommonFSUtils.getWALRootDir(conf), serverName, logName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -93,15 +94,14 @@ public boolean accept(Path p) {
private volatile boolean fsOk = true;

public MasterWalManager(MasterServices services) throws IOException {
this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(),
services.getMasterFileSystem().getWALRootDir(), services);
this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(), services);
}

public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterServices services)
public MasterWalManager(Configuration conf, FileSystem fs, MasterServices services)
throws IOException {
this.fs = fs;
this.conf = conf;
this.rootDir = rootDir;
this.rootDir = CommonFSUtils.getWALRootDir(conf);
this.services = services;
this.splitLogManager = new SplitLogManager(services, conf);

Expand Down Expand Up @@ -190,9 +190,10 @@ public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) thr

/**
* @return Returns the WALs dir under <code>rootDir</code>
* @throws IOException
*/
Path getWALDirPath() {
return new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME);
Path getWALDirPath() throws IOException {
return new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
}

/**
Expand All @@ -213,7 +214,7 @@ public FileStatus[] getWALDirPaths(final PathFilter filter) throws IOException {
* it.
*/
@Deprecated
public Set<ServerName> getFailedServersFromLogFolders() {
public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition,
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -72,18 +73,17 @@ class WALEntryStream implements Closeable {
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param serverName the server name which all WALs belong to
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics) throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.fs = CommonFSUtils.getWALFileSystem(conf);
this.conf = conf;
this.currentPosition = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
Expand Down Expand Up @@ -301,10 +301,10 @@ private boolean openNextLog() throws IOException {
}

private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path walRootDir = CommonFSUtils.getWALRootDir(conf);

// Try found the log in old dir
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
Expand All @@ -313,7 +313,7 @@ private Path getArchivedLog(Path path) throws IOException {

// Try found the log in the seperate old log dir
oldLogDir =
new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
Expand Down Expand Up @@ -370,7 +370,8 @@ private void openReader(Path path) throws IOException {
// For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);

final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1669,9 +1669,9 @@ public boolean rebuildMeta(boolean fix) throws IOException,
* Meta recovery WAL directory inside WAL directory path.
*/
private void removeHBCKMetaRecoveryWALDir(String walFactoryId) throws IOException {
Path rootdir = FSUtils.getRootDir(getConf());
Path walLogDir = new Path(new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), walFactoryId);
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Path walLogDir = new Path(new Path(CommonFSUtils.getWALRootDir(getConf()),
HConstants.HREGION_LOGDIR_NAME), walFactoryId);
FileSystem fs = CommonFSUtils.getWALFileSystem(getConf());
FileStatus[] walFiles = FSUtils.listStatus(fs, walLogDir, null);
if (walFiles == null || walFiles.length == 0) {
LOG.info("HBCK meta recovery WAL directory is empty, removing it now.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ public static boolean isArchivedLogFile(Path p) {
* @throws IOException exception
*/
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walRootDir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
Expand All @@ -429,7 +429,7 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx
oldLogDir = new Path(oldLogDir, serverName.getServerName());
}
Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
final FileSystem fs = FSUtils.getWALFileSystem(conf);

if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testHBaseCluster() throws Exception {

// Now we need to find the log file, its locations, and look at it

String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
String rootDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
"/" + targetRs.getServerName().toString()).toUri().getPath();

DistributedFileSystem mdfs = (DistributedFileSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void testDelayedDeleteOnFailure() throws Exception {
startCluster(1);
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
ServerName.valueOf("x", 1, 1).toString());
fs.mkdirs(logDir);
ExecutorService executor = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.util.List;
import java.util.Set;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -60,14 +60,27 @@ public class TestMasterWALManager {
public void before() throws IOException {
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
Mockito.when(mfs.getWALFileSystem()).thenReturn(HTU.getTestFileSystem());
Path walRootDir = HTU.createWALRootDir();
final Path walRootDir = HTU.getDataTestDir();;

Mockito.when(mfs.getWALRootDir()).thenReturn(walRootDir);
this.masterServices = Mockito.mock(MasterServices.class);
Mockito.when(this.masterServices.getConfiguration()).thenReturn(HTU.getConfiguration());
Mockito.when(this.masterServices.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(this.masterServices.getServerName()).
thenReturn(ServerName.parseServerName("master.example.org,0123,456"));
this.mwm = new MasterWalManager(this.masterServices);
Mockito.when(this.masterServices.getServerName())
.thenReturn(ServerName.parseServerName("master.example.org,0123,456"));
this.mwm = new MasterWalManager(this.masterServices) {

@Override
Path getWALDirPath() throws IOException {
return walRootDir;
}

@Override
Path getWALDirectoryName(ServerName serverName) {
return new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
}
};
}

@Test
Expand Down
Loading

0 comments on commit ef9ed98

Please sign in to comment.