diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e9062472221c..e47df36e3aa2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -26,6 +26,22 @@
@InterfaceAudience.Private
public class RecoveredReplicationSource extends ReplicationSource {
+ @Override
+ protected void startShippers() {
+ for (String walGroupId : logQueue.getQueues().keySet()) {
+ workerThreads.put(walGroupId, createNewShipper(walGroupId));
+ }
+ // start shippers after initializing the workerThreads, as in the below postFinish logic, if
+ // workerThreads is empty, we will mark the RecoveredReplicationSource as finished. So if we
+ // start the worker on the fly, it is possible that a shipper has already finished its work and
+ // called postFinish, and find out the workerThreads is empty and then mark the
+ // RecoveredReplicationSource as finish, while the next shipper has not been added to
+ // workerThreads yet. See HBASE-28155 for more details.
+ for (ReplicationSourceShipper shipper : workerThreads.values()) {
+ startShipper(shipper);
+ }
+ }
+
@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4c864e5e4502..094fa4aaa786 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -360,6 +360,19 @@ private long getStartOffset(String walGroupId) {
}
}
+ protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
+ ReplicationSourceWALReader walReader =
+ createNewWALReader(walGroupId, getStartOffset(walGroupId));
+ ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
+ Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
+ return worker;
+ }
+
+ protected final void startShipper(ReplicationSourceShipper worker) {
+ worker.startup(this::retryRefreshing);
+ }
+
private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
@@ -367,12 +380,8 @@ private void tryStartNewShipper(String walGroupId) {
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
- ReplicationSourceWALReader walReader =
- createNewWALReader(walGroupId, getStartOffset(walGroupId));
- ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
- Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
- + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
- worker.startup(this::retryRefreshing);
+ ReplicationSourceShipper worker = createNewShipper(walGroupId);
+ startShipper(worker);
return worker;
}
});
@@ -522,7 +531,7 @@ private long getCurrentBandwidth() {
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if sleepMultiplier
is < maxRetriesMultiplier
*/
- protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
@@ -605,10 +614,14 @@ private void initialize() {
queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
+ startShippers();
+ setSourceStartupStatus(false);
+ }
+
+ protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
- setSourceStartupStatus(false);
}
private synchronized void setSourceStartupStatus(boolean initializing) {