diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 00aa026093fa..62685eea6213 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); + private Path walDir; + private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, + public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException { + super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, walFileLengthProvider, metrics); + this.walDir = walDir; this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -93,7 +97,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOExc deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; for (Path possibleLogLocation : locs) { LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { + if (this.fs.exists(possibleLogLocation)) { // We found the right new location LOG.info("Log " + path + " still exists at " + possibleLogLocation); newPaths.add(possibleLogLocation); @@ -126,7 +130,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOExc // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal // area rather than to the wal area for a particular region server. private Path getReplSyncUpPath(Path path) throws IOException { - FileStatus[] rss = fs.listStatus(manager.getLogDir()); + FileStatus[] rss = fs.listStatus(walDir); for (FileStatus rs : rss) { Path p = rs.getPath(); FileStatus[] logs = fs.listStatus(p); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 85c46570dcf5..fd9fb311b2ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -28,7 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -36,6 +38,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -52,16 +55,20 @@ import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -93,7 +100,7 @@ public class ReplicationSource implements ReplicationSourceInterface { protected ReplicationQueueInfo replicationQueueInfo; // The manager of all sources to which we ping back our progress - protected ReplicationSourceManager manager; + ReplicationSourceManager manager; // Should we stop everything? protected Server server; // How long should we sleep for each retry @@ -130,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); - private AtomicLong totalBufferUsed; - public static final String WAIT_ON_ENDPOINT_SECONDS = "hbase.replication.wait.on.endpoint.seconds"; public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; @@ -183,7 +188,7 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param metrics metrics for replication source */ @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { @@ -211,7 +216,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); @@ -399,9 +403,9 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId, private ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { - return replicationPeer.getPeerConfig().isSerial() - ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) - : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + return replicationPeer.getPeerConfig().isSerial() ? + new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) : + new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); } /** @@ -426,11 +430,6 @@ public ReplicationEndpoint getReplicationEndpoint() { return this.replicationEndpoint; } - @Override - public ReplicationSourceManager getSourceManager() { - return this.manager; - } - @Override public void tryThrottle(int batchSize) throws InterruptedException { checkBandwidthChangeAndResetThrottler(); @@ -735,7 +734,7 @@ public void postShipEdits(List entries, int batchSize) { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); + long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize); // Record the new buffer usage this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @@ -770,4 +769,137 @@ void removeWorker(ReplicationSourceShipper worker) { private String logPeerId(){ return "[Source for peer " + this.getPeer().getId() + "]:"; } + + @VisibleForTesting + public void setWALPosition(WALEntryBatch entryBatch) { + String fileName = entryBatch.getLastWalPath().getName(); + interruptOrAbortWhenFail(() -> this.queueStorage + .setWALPosition(server.getServerName(), getQueueId(), fileName, + entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); + } + + @VisibleForTesting + public void cleanOldWALs(String log, boolean inclusive) { + NavigableSet walsToRemove = getWalsToRemove(log, inclusive); + if (walsToRemove.isEmpty()) { + return; + } + // cleanOldWALs may spend some time, especially for sync replication where we may want to + // remove remote wals as the remote cluster may have already been down, so we do it outside + // the lock to avoid block preLogRoll + cleanOldWALs(walsToRemove); + } + + private NavigableSet getWalsToRemove(String log, boolean inclusive) { + NavigableSet walsToRemove = new TreeSet<>(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); + try { + this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> { + LOG.debug("getWalsToRemove wal {}", wal); + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + if (walPrefix.equals(logPrefix)) { + walsToRemove.add(wal); + } + }); + } catch (ReplicationException e) { + // Just log the exception here, as the recovered replication source will try to cleanup again. + LOG.warn("Failed to read wals in queue {}", getQueueId(), e); + } + return walsToRemove.headSet(log, inclusive); + } + + private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals) + throws IOException { + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + for (String wal : wals) { + Path walFile = new Path(remoteWALDirForPeer, wal); + try { + if (!fs.delete(walFile, false) && fs.exists(walFile)) { + throw new IOException("Can not delete " + walFile); + } + } catch (FileNotFoundException e) { + // Just ignore since this means the file has already been deleted. + // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an + // inexistent file, so here we deal with both, i.e, check the return value of the + // FileSystem.delete, and also catch FNFE. + LOG.debug("The remote wal {} has already been deleted?", walFile, e); + } + } + } + + private void cleanOldWALs(NavigableSet wals) { + LOG.debug("Removing {} logs in the list: {}", wals.size(), wals); + // The intention here is that, we want to delete the remote wal files ASAP as it may effect the + // failover time if you want to transit the remote cluster from S to A. And the infinite retry + // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can + // not contact with the HBase cluster either, so the replication will be blocked either. + if (isSyncReplication()) { + String peerId = getPeerId(); + String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir(); + // Filter out the wals need to be removed from the remote directory. Its name should be the + // special format, and also, the peer id in its name should match the peer id for the + // replication source. + List remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider + .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false)) + .collect(Collectors.toList()); + LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(), + remoteWALDir, remoteWals); + if (!remoteWals.isEmpty()) { + for (int sleepMultiplier = 0;;) { + try { + removeRemoteWALs(peerId, remoteWALDir, remoteWals); + break; + } catch (IOException e) { + LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, + peerId); + } + if (!isSourceActive()) { + // skip the following operations + return; + } + if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, + sleepMultiplier, maxRetriesMultiplier)) { + sleepMultiplier++; + } + } + } + } + for (String wal : wals) { + interruptOrAbortWhenFail( + () -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal)); + } + } + + public void cleanUpHFileRefs(List files) { + interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files)); + } + + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + /** + * Refresh replication source will terminate the old source first, then the source thread will be + * interrupted. Need to handle it instead of abort the region server. + */ + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException + && e.getCause().getCause() != null && e.getCause() + .getCause() instanceof InterruptedException) { + // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is + // that thread is interrupted deep down in the stack, it should pass the following + // processing logic and propagate to the most top layer which can handle this exception + // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). + throw new ReplicationRuntimeException( + "Thread is interrupted, the replication source may be terminated", + e.getCause().getCause()); + } + server.abort("Failed to operate on replication queue", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 33a413f73a0a..321edc2bf08b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -43,15 +43,15 @@ public interface ReplicationSourceInterface { /** * Initializer for the source - * @param conf the configuration to use - * @param fs the file system to use - * @param manager the manager to use + * + * @param conf the configuration to use + * @param fs the file system to use * @param server the server for this region server */ - void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate @@ -147,11 +147,6 @@ default boolean isSyncReplication() { */ ReplicationEndpoint getReplicationEndpoint(); - /** - * @return the replication source manager - */ - ReplicationSourceManager getSourceManager(); - /** * @return the wal file length provider */ @@ -192,4 +187,18 @@ default Map getWalGroupStatus() { default boolean isRecovered() { return false; } + + /** + * Set the current position of WAL to {@link ReplicationQueueStorage} + * @param entryBatch a batch of WAL entries to replicate + */ + void setWALPosition(WALEntryBatch entryBatch); + + /** + * Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is + * closed and has no more entries. + * @param walName the name of WAL + * @param inclusive whether we should also remove the given WAL + */ + void cleanOldWALs(String walName, boolean inclusive); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a222f4bb885a..32126978e1ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -59,14 +57,11 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -324,7 +319,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer MetricsSource metrics = new MetricsSource(queueId); sourceMetrics.put(queueId, metrics); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, + src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId, walFileLengthProvider, metrics); return src; } @@ -528,29 +523,6 @@ private interface ReplicationQueueOperation { void exec() throws ReplicationException; } - /** - * Refresh replication source will terminate the old source first, then the source thread will be - * interrupted. Need to handle it instead of abort the region server. - */ - private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { - try { - op.exec(); - } catch (ReplicationException e) { - if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException - && e.getCause().getCause() != null && e.getCause() - .getCause() instanceof InterruptedException) { - // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is - // that thread is interrupted deep down in the stack, it should pass the following - // processing logic and propagate to the most top layer which can handle this exception - // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). - throw new ReplicationRuntimeException( - "Thread is interrupted, the replication source may be terminated", - e.getCause().getCause()); - } - server.abort("Failed to operate on replication queue", e); - } - } - private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -576,107 +548,6 @@ private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) thro } } - /** - * This method will log the current position to storage. And also clean old logs from the - * replication queue. - * @param source the replication source - * @param entryBatch the wal entry batch we just shipped - */ - public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, - WALEntryBatch entryBatch) { - String fileName = entryBatch.getLastWalPath().getName(); - interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), - source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); - cleanOldLogs(fileName, entryBatch.isEndOfFile(), source); - } - - /** - * Cleans a log file and all older logs from replication queue. Called when we are sure that a log - * file is closed and has no more entries. - * @param log Path to the log - * @param inclusive whether we should also remove the given log file - * @param source the replication source - */ - @VisibleForTesting - void cleanOldLogs(String log, boolean inclusive, - ReplicationSourceInterface source) { - NavigableSet walsToRemove; - synchronized (this.latestPaths) { - walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive); - } - if (walsToRemove.isEmpty()) { - return; - } - // cleanOldLogs may spend some time, especially for sync replication where we may want to - // remove remote wals as the remote cluster may have already been down, so we do it outside - // the lock to avoid block preLogRoll - cleanOldLogs(walsToRemove, source); - } - - private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals) - throws IOException { - Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); - FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); - for (String wal : wals) { - Path walFile = new Path(remoteWALDirForPeer, wal); - try { - if (!fs.delete(walFile, false) && fs.exists(walFile)) { - throw new IOException("Can not delete " + walFile); - } - } catch (FileNotFoundException e) { - // Just ignore since this means the file has already been deleted. - // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an - // inexistent file, so here we deal with both, i.e, check the return value of the - // FileSystem.delete, and also catch FNFE. - LOG.debug("The remote wal {} has already been deleted?", walFile, e); - } - } - } - - private void cleanOldLogs(NavigableSet wals, ReplicationSourceInterface source) { - LOG.debug("Removing {} logs in the list: {}", wals.size(), wals); - // The intention here is that, we want to delete the remote wal files ASAP as it may effect the - // failover time if you want to transit the remote cluster from S to A. And the infinite retry - // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can - // not contact with the HBase cluster either, so the replication will be blocked either. - if (source.isSyncReplication()) { - String peerId = source.getPeerId(); - String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); - // Filter out the wals need to be removed from the remote directory. Its name should be the - // special format, and also, the peer id in its name should match the peer id for the - // replication source. - List remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider - .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false)) - .collect(Collectors.toList()); - LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(), - remoteWALDir, remoteWals); - if (!remoteWals.isEmpty()) { - for (int sleepMultiplier = 0;;) { - try { - removeRemoteWALs(peerId, remoteWALDir, remoteWals); - break; - } catch (IOException e) { - LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, - peerId); - } - if (!source.isSourceActive()) { - // skip the following operations - return; - } - if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, - sleepMultiplier, maxRetriesMultiplier)) { - sleepMultiplier++; - } - } - } - } - String queueId = source.getQueueId(); - for (String wal : wals) { - interruptOrAbortWhenFail( - () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal)); - } - } - // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting public void preLogRoll(Path newLog) throws IOException { @@ -1092,10 +963,6 @@ private void addHFileRefs(String peerId, TableName tableName, byte[] family, } } - public void cleanUpHFileRefs(String peerId, List files) { - interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); - } - int activeFailoverTaskCount() { return executor.getActiveCount(); } @@ -1104,20 +971,13 @@ MetricsReplicationGlobalSourceSource getGlobalMetrics() { return this.globalMetrics; } - private NavigableSet getWalsToRemove(String queueId, String log, boolean inclusive) { - NavigableSet walsToRemove = new TreeSet<>(); - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); - try { - this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - if (walPrefix.equals(logPrefix)) { - walsToRemove.add(wal); - } - }); - } catch (ReplicationException e) { - // Just log the exception here, as the recovered replication source will try to cleanup again. - LOG.warn("Failed to read wals in queue {}", queueId, e); - } - return walsToRemove.headSet(log, inclusive); + @InterfaceAudience.Private + Server getServer() { + return this.server; + } + + @InterfaceAudience.Private + ReplicationQueueStorage getQueueStorage() { + return this.queueStorage; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 72cc5e82b699..4250c76fd6d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -240,12 +240,6 @@ private void shipEdits(WALEntryBatch entryBatch) { } private void cleanUpHFileRefs(WALEdit edit) throws IOException { - String peerId = source.getPeerId(); - if (peerId.contains("-")) { - // peerClusterZnode will be in the form peerId + "-" + rsZNode. - // A peerId will not have "-" in its name, see HBASE-11394 - peerId = peerId.split("-")[0]; - } List cells = edit.getCells(); int totalCells = cells.size(); for (int i = 0; i < totalCells; i++) { @@ -256,7 +250,7 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { int totalStores = stores.size(); for (int j = 0; j < totalStores; j++) { List storeFileList = stores.get(j).getStoreFileList(); - source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); + source.cleanUpHFileRefs(storeFileList); source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); } } @@ -268,10 +262,11 @@ private boolean updateLogPosition(WALEntryBatch batch) { // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and // there is no entry in the batch. It is OK because that the queue storage will ignore the zero - // position and the file will be removed soon in cleanOldLogs. - if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || - batch.getLastWalPosition() != currentPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(source, batch); + // position and the file will be removed soon in cleanOldWALs. + if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) + || batch.getLastWalPosition() != currentPosition) { + source.setWALPosition(batch); + source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile()); updated = true; } // if end of file is true, then we can just skip to the next file in queue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index c71db1bf785b..22cbd97d33af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -74,9 +73,6 @@ class ReplicationSourceWALReader extends Thread { //Indicates whether this particular worker is running private boolean isReaderRunning = true; - private AtomicLong totalBufferUsed; - private long totalBufferQuota; - /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -102,8 +98,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -273,9 +267,10 @@ public Path getCurrentPath() { //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota - if (totalBufferUsed.get() > totalBufferQuota) { + if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) { LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); + this.source.getPeerId(), source.manager.getTotalBufferUsed().get(), + source.manager.getTotalBufferLimit()); Threads.sleep(sleepForRetries); return false; } @@ -404,10 +399,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - long newBufferUsed = totalBufferUsed.addAndGet(size); + long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size); // Record the new buffer usage - this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - return newBufferUsed >= totalBufferQuota; + source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= source.manager.getTotalBufferLimit(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a17a54..2108ddc16358 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 4f96c96d3c5d..591b44de931b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -31,7 +31,7 @@ * Holds a batch of WAL entries to replicate, along with some statistics */ @InterfaceAudience.Private -class WALEntryBatch { +public class WALEntryBatch { // used by recovered replication queue to indicate that all the entries have been read. public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 781a1da16242..b75a7ed3ab88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -38,7 +39,6 @@ */ public class ReplicationSourceDummy implements ReplicationSourceInterface { - private ReplicationSourceManager manager; private ReplicationPeer replicationPeer; private String peerClusterId; private Path currentPath; @@ -47,11 +47,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { - this.manager = manager; + public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, + ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.peerClusterId = peerClusterId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; @@ -132,11 +131,6 @@ public ReplicationEndpoint getReplicationEndpoint() { return null; } - @Override - public ReplicationSourceManager getSourceManager() { - return manager; - } - @Override public void tryThrottle(int batchSize) throws InterruptedException { } @@ -155,6 +149,14 @@ public ServerName getServerWALsBelongTo() { return null; } + @Override + public void setWALPosition(WALEntryBatch entryBatch) { + } + + @Override + public void cleanOldWALs(String walName, boolean inclusive) { + } + @Override public ReplicationPeer getPeer() { return replicationPeer; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 15f202f06467..a58d1a39a576 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -126,11 +126,11 @@ public void testDefaultSkipsMetaWAL() throws IOException { thenReturn(DoNothingReplicationEndpoint.class.getName()); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -164,11 +164,11 @@ public void testWALEntryFilter() throws IOException { thenReturn(DoNothingReplicationEndpoint.class.getName()); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, + rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -255,8 +255,8 @@ public void testTerminateTimeout() throws Exception { Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 8e38114fa0a5..4b685ce42039 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; +import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; @@ -333,8 +334,9 @@ public void testLogRoll() throws Exception { when(source.getQueueId()).thenReturn("1"); when(source.isRecovered()).thenReturn(false); when(source.isSyncReplication()).thenReturn(false); - manager.logPositionAndCleanOldLogs(source, - new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); + WALEntryBatch batch = new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()); + source.setWALPosition(batch); + source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile()); wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), @@ -408,11 +410,10 @@ public void testCleanupFailoverQueues() throws Exception { assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); - ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); - when(source.getQueueId()).thenReturn(id); - when(source.isRecovered()).thenReturn(true); - when(source.isSyncReplication()).thenReturn(false); - manager.cleanOldLogs(file2, false, source); + ReplicationSourceInterface source = new ReplicationSource(); + source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"), + manager.getServer(), id, null, p -> OptionalLong.empty(), null); + source.cleanOldWALs(file2, false); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } @@ -589,19 +590,15 @@ public void testRemovePeerMetricsCleanup() throws Exception { } } - private ReplicationSourceInterface mockReplicationSource(String peerId) { - ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); - when(source.getPeerId()).thenReturn(peerId); - when(source.getQueueId()).thenReturn(peerId); - when(source.isRecovered()).thenReturn(false); - when(source.isSyncReplication()).thenReturn(true); + private ReplicationPeer mockReplicationPeerForSyncReplication(String peerId) { ReplicationPeerConfig config = mock(ReplicationPeerConfig.class); when(config.getRemoteWALDir()) .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); + when(config.isSyncReplication()).thenReturn(true); ReplicationPeer peer = mock(ReplicationPeer.class); when(peer.getPeerConfig()).thenReturn(config); - when(source.getPeer()).thenReturn(peer); - return source; + when(peer.getId()).thenReturn(peerId); + return peer; } @Test @@ -630,13 +627,19 @@ public void testRemoveRemoteWALs() throws Exception { manager.preLogRoll(wal); manager.postLogRoll(wal); - ReplicationSourceInterface source = mockReplicationSource(peerId2); - manager.cleanOldLogs(walName, true, source); + ReplicationSourceInterface source = new ReplicationSource(); + source.init(conf, fs, null, manager, manager.getQueueStorage(), + mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null, + p -> OptionalLong.empty(), null); + source.cleanOldWALs(walName, true); // still there if peer id does not match assertTrue(fs.exists(remoteWAL)); - source = mockReplicationSource(slaveId); - manager.cleanOldLogs(walName, true, source); + source = new ReplicationSource(); + source.init(conf, fs, null, manager, manager.getQueueStorage(), + mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null, + p -> OptionalLong.empty(), null); + source.cleanOldWALs(walName, true); assertFalse(fs.exists(remoteWAL)); } finally { removePeerAndWait(peerId2); @@ -813,11 +816,10 @@ private int isLogZnodesMapPopulated() { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { - @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + @Override public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, + Server server, String peerClusterId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 63e7a8b90496..9410604f5d7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -369,23 +369,27 @@ public void testWALKeySerialization() throws Exception { } private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()).thenReturn( - (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); - when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( - MetricsReplicationGlobalSourceSource.class); - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); + source.manager = mockReplicationSourceManager(); return source; } + private ReplicationSourceManager mockReplicationSourceManager() { + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + MetricsReplicationGlobalSourceSource globalMetrics = + Mockito.mock(MetricsReplicationGlobalSourceSource.class); + when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(mockSourceManager.getTotalBufferLimit()) + .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + return mockSourceManager; + } + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true);