diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e1d3de9d513b..5ffd1d6eabcc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -979,6 +979,8 @@ public enum OperationStatusCode {
/*
* cluster replication constants.
*/
+ public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
+ public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf999336..6ecbb4670927 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -33,5 +33,5 @@ public interface ReplicationListener {
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
- public void regionServerRemoved(String regionServer);
+ void regionServerRemoved(String regionServer);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
new file mode 100644
index 000000000000..5bb9dd6f3585
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
+ * {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceController {
+
+ /**
+ * Returns the maximum size in bytes of edits held in memory which are pending replication
+ * across all sources inside this RegionServer or ReplicationServer.
+ */
+ long getTotalBufferLimit();
+
+ AtomicLong getTotalBufferUsed();
+
+ MetricsReplicationGlobalSourceSource getGlobalMetrics();
+
+ /**
+ * Call this when the recovered replication source replicated all WALs.
+ */
+ void finishRecoveredSource(RecoveredReplicationSource src);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e3400ad38f25..eece3c0fd5ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
- private Path walDir;
-
private String actualPeerId;
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
- super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
- clusterId, walFileLengthProvider, metrics);
- this.walDir = walDir;
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
+ peerClusterZnode, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@@ -149,7 +147,7 @@ private Path getReplSyncUpPath(Path path) throws IOException {
void tryFinish() {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
- manager.finishRecoveredSource(this);
+ controller.finishRecoveredSource(this);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 473abfdeacc8..848081ac3800 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -99,8 +100,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
- // The manager of all sources to which we ping back our progress
- ReplicationSourceManager manager;
+ protected Path walDir;
+
+ protected ReplicationSourceController controller;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
@@ -187,23 +189,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
}
- /**
- * Instantiation method used by region servers
- * @param conf configuration to use
- * @param fs file system to use
- * @param manager replication manager to ping to
- * @param server the server for this region server
- * @param queueId the id of our replication queue
- * @param clusterId unique UUID for the cluster
- * @param metrics metrics for replication source
- */
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
+ this.walDir = walDir;
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf();
@@ -214,7 +207,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
- this.manager = manager;
+ this.controller = overallController;
this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
@@ -767,9 +760,9 @@ public void postShipEdits(List entries, int batchSize) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
+ long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
// Record the new buffer usage
- this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 321edc2bf08b..f3bf8a41ff90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
/**
* Initializer for the source
*
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param server the server for this region server
- */
- void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException;
+ * @param conf configuration to use
+ * @param fs file system to use
+ * @param walDir the directory where the WAL is located
+ * @param overallController the overall controller of all replication sources
+ * @param queueStorage the replication queue storage
+ * @param replicationPeer the replication peer
+ * @param server the server which start and run this replication source
+ * @param queueId the id of our replication queue
+ * @param clusterId unique UUID for the cluster
+ * @param walFileLengthProvider used to get the WAL length
+ * @param metrics metrics for this replication source
+ */
+ void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 32126978e1ee..de9e21f99ae7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
@@ -92,7 +93,7 @@
*
*/
@InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one replication source
private final ConcurrentMap sources;
@@ -126,18 +127,18 @@ public class ReplicationSourceManager implements ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong();
- // How long should we sleep for each retry when deleting remote wal files for sync replication
- // peer.
- private final long sleepForRetries;
- // Maximum number of retries before taking bold actions when deleting remote wal files for sync
- // replication peer.
- private final int maxRetriesMultiplier;
// Total buffer size on this RegionServer for holding batched edits to be shipped.
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
private final Map sourceMetrics = new HashMap<>();
+ /**
+ * When enable replication offload, will not create replication source and only write WAL to
+ * replication queue storage. The replication source will be started by ReplicationServer.
+ */
+ private final boolean replicationOffload;
+
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
@@ -186,12 +187,11 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
this.latestPaths = new HashMap<>();
this.replicationForBulkLoadDataEnabled = conf.getBoolean(
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
- this.maxRetriesMultiplier =
- this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.globalMetrics = globalMetrics;
+ this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
}
/**
@@ -212,6 +212,47 @@ Future> init() throws IOException {
return this.executor.submit(this::adoptAbandonedQueues);
}
+ @VisibleForTesting
+ @Override
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
+
+ @Override
+ public long getTotalBufferLimit() {
+ return totalBufferLimit;
+ }
+
+ @Override
+ public void finishRecoveredSource(RecoveredReplicationSource src) {
+ synchronized (oldsources) {
+ if (!removeRecoveredSource(src)) {
+ return;
+ }
+ }
+ LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+ src.getStats());
+ }
+
+ @Override
+ public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
+
+ /**
+ * Clear the metrics and related replication queue of the specified old source
+ * @param src source to clear
+ */
+ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+ if (!this.oldsources.remove(src)) {
+ return false;
+ }
+ LOG.info("Done with the recovered queue {}", src.getQueueId());
+ // Delete queue from storage and memory
+ deleteQueue(src.getQueueId());
+ return true;
+ }
+
private void adoptAbandonedQueues() {
List currentReplicators = null;
try {
@@ -331,8 +372,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer
* @param peerId the id of the replication peer
* @return the source that was created
*/
- @VisibleForTesting
- ReplicationSourceInterface addSource(String peerId) throws IOException {
+ void addSource(String peerId) throws IOException {
ReplicationPeer peer = replicationPeers.getPeer(peerId);
ReplicationSourceInterface src = createSource(peerId, peer);
// synchronized on latestPaths to avoid missing the new log
@@ -354,8 +394,9 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
if (peerConfig.isSyncReplication()) {
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
}
- src.startup();
- return src;
+ if (!replicationOffload) {
+ src.startup();
+ }
}
/**
@@ -373,7 +414,11 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
*
* @param peerId the id of the sync replication peer
*/
- public void drainSources(String peerId) throws IOException, ReplicationException {
+ void drainSources(String peerId) throws IOException, ReplicationException {
+ if (replicationOffload) {
+ throw new ReplicationException(
+ "Should not add use sync replication when replication offload enabled");
+ }
String terminateMessage = "Sync replication peer " + peerId +
" is transiting to STANDBY. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -430,7 +475,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
- public void refreshSources(String peerId) throws ReplicationException, IOException {
+ void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId +
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -447,7 +492,9 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti
.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
LOG.info("Startup replication source for " + src.getPeerId());
- src.startup();
+ if (!replicationOffload) {
+ src.startup();
+ }
List toStartup = new ArrayList<>();
// synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -470,41 +517,18 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti
toStartup.add(recoveredReplicationSource);
}
}
- for (ReplicationSourceInterface replicationSource : toStartup) {
- replicationSource.startup();
- }
- }
-
- /**
- * Clear the metrics and related replication queue of the specified old source
- * @param src source to clear
- */
- private boolean removeRecoveredSource(ReplicationSourceInterface src) {
- if (!this.oldsources.remove(src)) {
- return false;
- }
- LOG.info("Done with the recovered queue {}", src.getQueueId());
- // Delete queue from storage and memory
- deleteQueue(src.getQueueId());
- return true;
- }
-
- void finishRecoveredSource(ReplicationSourceInterface src) {
- synchronized (oldsources) {
- if (!removeRecoveredSource(src)) {
- return;
+ if (!replicationOffload) {
+ for (ReplicationSourceInterface replicationSource : toStartup) {
+ replicationSource.startup();
}
}
- LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
- src.getStats());
}
/**
* Clear the metrics and related replication queue of the specified old source
* @param src source to clear
*/
- void removeSource(ReplicationSourceInterface src) {
- LOG.info("Done with the queue " + src.getQueueId());
+ private void removeSource(ReplicationSourceInterface src) {
this.sources.remove(src.getPeerId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
@@ -548,8 +572,7 @@ private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) thro
}
}
- // public because of we call it in TestReplicationEmptyWALRecovery
- @VisibleForTesting
+ @InterfaceAudience.Private
public void preLogRoll(Path newLog) throws IOException {
String logName = newLog.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -567,9 +590,8 @@ public void preLogRoll(Path newLog) throws IOException {
}
}
- // public because of we call it in TestReplicationEmptyWALRecovery
- @VisibleForTesting
- public void postLogRoll(Path newLog) throws IOException {
+ @InterfaceAudience.Private
+ public void postLogRoll(Path newLog) {
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog);
@@ -739,7 +761,9 @@ public void run() {
LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
src.enqueueLog(new Path(oldLogDir, wal));
}
- src.startup();
+ if (!replicationOffload) {
+ src.startup();
+ }
}
} catch (IOException e) {
// TODO manage it
@@ -849,19 +873,6 @@ Set getLastestPath() {
}
}
- @VisibleForTesting
- public AtomicLong getTotalBufferUsed() {
- return totalBufferUsed;
- }
-
- /**
- * Returns the maximum size in bytes of edits held in memory which are pending replication
- * across all sources inside this RegionServer.
- */
- public long getTotalBufferLimit() {
- return totalBufferLimit;
- }
-
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
@@ -967,10 +978,6 @@ int activeFailoverTaskCount() {
return executor.getActiveCount();
}
- MetricsReplicationGlobalSourceSource getGlobalMetrics() {
- return this.globalMetrics;
- }
-
@InterfaceAudience.Private
Server getServer() {
return this.server;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 22cbd97d33af..7b7d0d830f97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -267,10 +267,11 @@ public Path getCurrentPath() {
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
- if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
+ if (source.controller.getTotalBufferUsed().get() > source.controller
+ .getTotalBufferLimit()) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
- this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
- source.manager.getTotalBufferLimit());
+ this.source.getPeerId(), source.controller.getTotalBufferUsed().get(),
+ source.controller.getTotalBufferLimit());
Threads.sleep(sleepForRetries);
return false;
}
@@ -399,10 +400,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
+ long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size);
// Record the new buffer usage
- source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
- return newBufferUsed >= source.manager.getTotalBufferLimit();
+ source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ return newBufferUsed >= source.controller.getTotalBufferLimit();
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index b75a7ed3ab88..66059c722cb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -40,21 +39,21 @@
public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationPeer replicationPeer;
- private String peerClusterId;
+ private String queueId;
private Path currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
- this.peerClusterId = peerClusterId;
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ this.queueId = queueId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
- this.replicationPeer = rp;
+ this.replicationPeer = replicationPeer;
}
@Override
@@ -96,14 +95,14 @@ public void terminate(String reason, Exception e, boolean clearMetrics) {
@Override
public String getQueueId() {
- return peerClusterId;
+ return queueId;
}
@Override
public String getPeerId() {
- String[] parts = peerClusterId.split("-", 2);
+ String[] parts = queueId.split("-", 2);
return parts.length != 1 ?
- parts[0] : peerClusterId;
+ parts[0] : queueId;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4b685ce42039..0e0353fc14da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -43,6 +43,7 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -72,6 +73,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -816,10 +818,11 @@ private int isLogZnodesMapPopulated() {
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
- @Override public void init(Configuration conf, FileSystem fs, Path walDir,
- ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
- Server server, String peerClusterId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ @Override
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{
throw new IOException("Failing deliberately");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9410604f5d7c..bafabb0e5dfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -375,19 +376,19 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
- source.manager = mockReplicationSourceManager();
+ source.controller = mockReplicationSourceController();
return source;
}
- private ReplicationSourceManager mockReplicationSourceManager() {
- ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ private ReplicationSourceController mockReplicationSourceController() {
+ ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
- when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- when(mockSourceManager.getTotalBufferLimit())
+ when(controller.getGlobalMetrics()).thenReturn(globalMetrics);
+ when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(controller.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
- return mockSourceManager;
+ return controller;
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {