Skip to content

Commit

Permalink
HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCl…
Browse files Browse the repository at this point in the history
…eanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
infraio committed Sep 8, 2020
1 parent b659616 commit 54e70e5
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {

private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);

private Path walDir;

private String actualPeerId;

@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
clusterId, walFileLengthProvider, metrics);
this.walDir = walDir;
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}

Expand Down Expand Up @@ -93,7 +97,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
if (this.fs.exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
newPaths.add(possibleLogLocation);
Expand Down Expand Up @@ -126,7 +130,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
private Path getReplSyncUpPath(Path path) throws IOException {
FileStatus[] rss = fs.listStatus(manager.getLogDir());
FileStatus[] rss = fs.listStatus(walDir);
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -52,16 +55,20 @@
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +100,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected ReplicationQueueInfo replicationQueueInfo;

// The manager of all sources to which we ping back our progress
protected ReplicationSourceManager manager;
ReplicationSourceManager manager;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
Expand Down Expand Up @@ -130,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();

private AtomicLong totalBufferUsed;

public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
Expand Down Expand Up @@ -183,7 +188,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
* @param metrics metrics for replication source
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
Expand Down Expand Up @@ -211,7 +216,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
Expand Down Expand Up @@ -399,9 +403,9 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId,

private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return replicationPeer.getPeerConfig().isSerial() ?
new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) :
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
}

/**
Expand All @@ -426,11 +430,6 @@ public ReplicationEndpoint getReplicationEndpoint() {
return this.replicationEndpoint;
}

@Override
public ReplicationSourceManager getSourceManager() {
return this.manager;
}

@Override
public void tryThrottle(int batchSize) throws InterruptedException {
checkBandwidthChangeAndResetThrottler();
Expand Down Expand Up @@ -735,7 +734,7 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
Expand Down Expand Up @@ -770,4 +769,137 @@ void removeWorker(ReplicationSourceShipper worker) {
private String logPeerId(){
return "[Source for peer " + this.getPeer().getId() + "]:";
}

@VisibleForTesting
public void setWALPosition(WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName();
interruptOrAbortWhenFail(() -> this.queueStorage
.setWALPosition(server.getServerName(), getQueueId(), fileName,
entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
}

@VisibleForTesting
public void cleanOldWALs(String log, boolean inclusive) {
NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
// cleanOldWALs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldWALs(walsToRemove);
}

private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
NavigableSet<String> walsToRemove = new TreeSet<>();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
try {
this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
LOG.debug("getWalsToRemove wal {}", wal);
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
if (walPrefix.equals(logPrefix)) {
walsToRemove.add(wal);
}
});
} catch (ReplicationException e) {
// Just log the exception here, as the recovered replication source will try to cleanup again.
LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
}
return walsToRemove.headSet(log, inclusive);
}

private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
try {
if (!fs.delete(walFile, false) && fs.exists(walFile)) {
throw new IOException("Can not delete " + walFile);
}
} catch (FileNotFoundException e) {
// Just ignore since this means the file has already been deleted.
// The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
// inexistent file, so here we deal with both, i.e, check the return value of the
// FileSystem.delete, and also catch FNFE.
LOG.debug("The remote wal {} has already been deleted?", walFile, e);
}
}
}

private void cleanOldWALs(NavigableSet<String> wals) {
LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the
// failover time if you want to transit the remote cluster from S to A. And the infinite retry
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
// not contact with the HBase cluster either, so the replication will be blocked either.
if (isSyncReplication()) {
String peerId = getPeerId();
String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
// Filter out the wals need to be removed from the remote directory. Its name should be the
// special format, and also, the peer id in its name should match the peer id for the
// replication source.
List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
.getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
.collect(Collectors.toList());
LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
remoteWALDir, remoteWals);
if (!remoteWals.isEmpty()) {
for (int sleepMultiplier = 0;;) {
try {
removeRemoteWALs(peerId, remoteWALDir, remoteWals);
break;
} catch (IOException e) {
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
peerId);
}
if (!isSourceActive()) {
// skip the following operations
return;
}
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
}
}
for (String wal : wals) {
interruptOrAbortWhenFail(
() -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
}
}

public void cleanUpHFileRefs(List<String> files) {
interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
}

@FunctionalInterface
private interface ReplicationQueueOperation {
void exec() throws ReplicationException;
}

/**
* Refresh replication source will terminate the old source first, then the source thread will be
* interrupted. Need to handle it instead of abort the region server.
*/
private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
} catch (ReplicationException e) {
if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
&& e.getCause().getCause() != null && e.getCause()
.getCause() instanceof InterruptedException) {
// ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
// that thread is interrupted deep down in the stack, it should pass the following
// processing logic and propagate to the most top layer which can handle this exception
// properly. In this specific case, the top layer is ReplicationSourceShipper#run().
throw new ReplicationRuntimeException(
"Thread is interrupted, the replication source may be terminated",
e.getCause().getCause());
}
server.abort("Failed to operate on replication queue", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ public interface ReplicationSourceInterface {

/**
* Initializer for the source
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
*
* @param conf the configuration to use
* @param fs the file system to use
* @param server the server for this region server
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException;
void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException;

/**
* Add a log to the list of logs to replicate
Expand Down Expand Up @@ -147,11 +147,6 @@ default boolean isSyncReplication() {
*/
ReplicationEndpoint getReplicationEndpoint();

/**
* @return the replication source manager
*/
ReplicationSourceManager getSourceManager();

/**
* @return the wal file length provider
*/
Expand Down Expand Up @@ -192,4 +187,18 @@ default Map<String, ReplicationStatus> getWalGroupStatus() {
default boolean isRecovered() {
return false;
}

/**
* Set the current position of WAL to {@link ReplicationQueueStorage}
* @param entryBatch a batch of WAL entries to replicate
*/
void setWALPosition(WALEntryBatch entryBatch);

/**
* Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
* closed and has no more entries.
* @param walName the name of WAL
* @param inclusive whether we should also remove the given WAL
*/
void cleanOldWALs(String walName, boolean inclusive);
}
Loading

0 comments on commit 54e70e5

Please sign in to comment.