Skip to content

Commit

Permalink
HBASE-22617 Recovered WAL directories not getting cleaned up (#330)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
Apache9 committed Jun 25, 2019
1 parent a1aab95 commit a172b48
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -429,11 +428,9 @@ private static boolean isValidWALRootDir(Path walDir, final Configuration c) thr
* @return the region directory used to store WALs under the WALRootDir
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALRegionDir(final Configuration conf,
final TableName tableName, final String encodedRegionName)
throws IOException {
return new Path(getWALTableDir(conf, tableName),
encodedRegionName);
public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
final String encodedRegionName) throws IOException {
return new Path(getWALTableDir(conf, tableName), encodedRegionName);
}

/**
Expand All @@ -445,8 +442,22 @@ public static Path getWALRegionDir(final Configuration conf,
*/
public static Path getWALTableDir(final Configuration conf, final TableName tableName)
throws IOException {
return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}

/**
* For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
* directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
* @deprecated For compatibility, will be removed in 4.0.0.
*/
@Deprecated
public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
final String encodedRegionName) throws IOException {
Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
return new Path(wrongTableDir, encodedRegionName);
}

/**
Expand Down Expand Up @@ -1059,5 +1070,4 @@ public StreamLacksCapabilityException(String message) {
super(message);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -87,23 +86,21 @@ private HFileArchiver() {
public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path regionDir = HRegion.getRegionDir(rootDir, info);
Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info);
return fs.exists(regionDir);
}

/**
* Cleans up all the files for a HRegion by archiving the HFiles to the
* archive directory
* Cleans up all the files for a HRegion by archiving the HFiles to the archive directory
* @param conf the configuration to use
* @param fs the file system object
* @param info RegionInfo for region to be deleted
* @throws IOException
*/
public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
archiveRegion(fs, rootDir, FSUtils.getTableDir(rootDir, info.getTable()),
HRegion.getRegionDir(rootDir, info));
FSUtils.getRegionDirFromRootDir(rootDir, info));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ public FileSystem getFileSystem() {
return this.fs;
}

protected FileSystem getWALFileSystem() { return this.walFs; }
public FileSystem getWALFileSystem() {
return this.walFs;
}

public Configuration getConfiguration() {
return this.conf;
Expand All @@ -220,7 +222,7 @@ public Path getWALRootDir() {
* @return the directory for a give {@code region}.
*/
public Path getRegionDir(RegionInfo region) {
return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), region.getTable()), region);
return FSUtils.getRegionDirFromRootDir(getRootDir(), region);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.GCRegionState;
Expand Down Expand Up @@ -64,46 +68,65 @@ public TableOperationType getTableOperationType() {

@Override
protected Flow executeFromState(MasterProcedureEnv env, GCRegionState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " execute state=" + state);
}
MasterServices masterServices = env.getMasterServices();
try {
switch (state) {
case GC_REGION_PREPARE:
// Nothing to do to prepare.
setNextState(GCRegionState.GC_REGION_ARCHIVE);
break;
case GC_REGION_ARCHIVE:
FileSystem fs = masterServices.getMasterFileSystem().getFileSystem();
if (HFileArchiver.exists(masterServices.getConfiguration(), fs, getRegion())) {
if (LOG.isDebugEnabled()) LOG.debug("Archiving region=" + getRegion().getShortNameToLog());
HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, getRegion());
}
setNextState(GCRegionState.GC_REGION_PURGE_METADATA);
break;
case GC_REGION_PURGE_METADATA:
// TODO: Purge metadata before removing from HDFS? This ordering is copied
// from CatalogJanitor.
AssignmentManager am = masterServices.getAssignmentManager();
if (am != null) {
if (am.getRegionStates() != null) {
am.getRegionStates().deleteRegion(getRegion());
case GC_REGION_PREPARE:
// Nothing to do to prepare.
setNextState(GCRegionState.GC_REGION_ARCHIVE);
break;
case GC_REGION_ARCHIVE:
MasterFileSystem mfs = masterServices.getMasterFileSystem();
FileSystem fs = mfs.getFileSystem();
if (HFileArchiver.exists(masterServices.getConfiguration(), fs, getRegion())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Archiving region=" + getRegion().getShortNameToLog());
}
HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, getRegion());
}
FileSystem walFs = mfs.getWALFileSystem();
// Cleanup the directories on WAL filesystem also
Path regionWALDir = FSUtils.getWALRegionDir(env.getMasterConfiguration(),
getRegion().getTable(), getRegion().getEncodedName());
if (walFs.exists(regionWALDir)) {
if (!walFs.delete(regionWALDir, true)) {
LOG.debug("Failed to delete {}", regionWALDir);
}
}
Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(env.getMasterConfiguration(),
getRegion().getTable(), getRegion().getEncodedName());
if (walFs.exists(wrongRegionWALDir)) {
if (!walFs.delete(wrongRegionWALDir, true)) {
LOG.debug("Failed to delete {}", regionWALDir);
}
}
setNextState(GCRegionState.GC_REGION_PURGE_METADATA);
break;
case GC_REGION_PURGE_METADATA:
// TODO: Purge metadata before removing from HDFS? This ordering is copied
// from CatalogJanitor.
AssignmentManager am = masterServices.getAssignmentManager();
if (am != null) {
if (am.getRegionStates() != null) {
am.getRegionStates().deleteRegion(getRegion());
}
}
MetaTableAccessor.deleteRegion(masterServices.getConnection(), getRegion());
masterServices.getServerManager().removeRegion(getRegion());
FavoredNodesManager fnm = masterServices.getFavoredNodesManager();
if (fnm != null) {
fnm.deleteFavoredNodesForRegions(Lists.newArrayList(getRegion()));
}
}
MetaTableAccessor.deleteRegion(masterServices.getConnection(), getRegion());
masterServices.getServerManager().removeRegion(getRegion());
FavoredNodesManager fnm = masterServices.getFavoredNodesManager();
if (fnm != null) {
fnm.deleteFavoredNodesForRegions(Lists.newArrayList(getRegion()));
}
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);
}
} catch (IOException ioe) {
// TODO: This is going to spew log?
// TODO: This is going to spew log? Add retry backoff
LOG.warn("Error trying to GC " + getRegion().getShortNameToLog() + "; retrying...", ioe);
}
return Flow.HAS_MORE_STATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,16 +843,16 @@ private ServerName getServerName(final MasterProcedureEnv env) {
}

private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
MasterFileSystem fs = env.getMasterFileSystem();
long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) {
maxSequenceId =
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
walFS, getWALRegionDir(env, region)));
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(env.getMasterConfiguration(),
region, fs::getFileSystem, fs::getWALFileSystem));
}
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(fs.getWALFileSystem(),
getWALRegionDir(env, mergedRegion), maxSequenceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
Expand All @@ -35,13 +34,13 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -217,10 +216,9 @@ private void updateRegionLocation(RegionInfo regionInfo, State state, Put put)
}

private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
FileSystem walFS = master.getMasterWalManager().getFileSystem();
long maxSeqId =
WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
master.getConfiguration(), region.getTable(), region.getEncodedName()));
MasterFileSystem fs = master.getMasterFileSystem();
long maxSeqId = WALSplitter.getMaxRegionSequenceId(master.getConfiguration(), region,
fs::getFileSystem, fs::getWALFileSystem);
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -596,7 +595,7 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
final FileSystem fs = mfs.getFileSystem();
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
regionFs.createSplitsDir();
regionFs.createSplitsDir(daughter_1_RI, daughter_2_RI);

Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);

Expand Down Expand Up @@ -903,14 +902,14 @@ private int getRegionReplication(final MasterProcedureEnv env) throws IOExceptio
}

private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
MasterFileSystem fs = env.getMasterFileSystem();
long maxSequenceId = WALSplitter.getMaxRegionSequenceId(env.getMasterConfiguration(),
getParentRegion(), fs::getFileSystem, fs::getWALFileSystem);
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI),
maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI),
maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(fs.getWALFileSystem(),
getWALRegionDir(env, daughter_1_RI), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(fs.getWALFileSystem(),
getWALRegionDir(env, daughter_2_RI), maxSequenceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,11 @@ protected static void deleteFromFs(final MasterProcedureEnv env,

// Archive regions from FS (temp directory)
if (archive) {
List<Path> regionDirList = regions.stream()
.filter(RegionReplicaUtil::isDefaultReplica)
.map(region -> FSUtils.getRegionDir(tempTableDir, region))
List<Path> regionDirList = regions.stream().filter(RegionReplicaUtil::isDefaultReplica)
.map(region -> FSUtils.getRegionDirFromTableDir(tempTableDir, region))
.collect(Collectors.toList());
HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(),
tempTableDir, regionDirList);
HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(), tempTableDir,
regionDirList);
LOG.debug("Table '{}' archived!", tableName);
}

Expand All @@ -343,6 +342,13 @@ protected static void deleteFromFs(final MasterProcedureEnv env,
throw new IOException("Couldn't delete mob dir " + mobTableDir);
}
}

// Delete the directory on wal filesystem
FileSystem walFs = mfs.getWALFileSystem();
Path tableWALDir = FSUtils.getWALTableDir(env.getMasterConfiguration(), tableName);
if (walFs.exists(tableWALDir) && !walFs.delete(tableWALDir, true)) {
throw new IOException("Couldn't delete table dir on wal filesystem" + tableWALDir);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -113,13 +113,13 @@ protected Flow executeFromState(final MasterProcedureEnv env, final DisableTable
case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName)
.hasGlobalReplicationScope()) {
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
MasterFileSystem fs = env.getMasterFileSystem();
try (BufferedMutator mutator = env.getMasterServices().getConnection()
.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
long maxSequenceId = WALSplitter.getMaxRegionSequenceId(
env.getMasterConfiguration(), region, fs::getFileSystem, fs::getWALFileSystem);
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime()));
Expand Down
Loading

0 comments on commit a172b48

Please sign in to comment.