diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index e740a01dc4f7..e9062472221c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -27,8 +27,9 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override - protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) { - return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage, + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + ReplicationSourceWALReader walReader) { + return new RecoveredReplicationSourceShipper(conf, walGroupId, this, walReader, queueStorage, () -> { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 2bb3a7c3591c..ece566d96006 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final Runnable tryFinish; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source, + RecoveredReplicationSource source, ReplicationSourceWALReader walReader, ReplicationQueueStorage queueStorage, Runnable tryFinish) { - super(conf, walGroupId, logQueue, source); + super(conf, walGroupId, source, walReader); this.tryFinish = tryFinish; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index e4da44e9b13a..00be66c5c0fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -367,14 +367,13 @@ private void tryStartNewShipper(String walGroupId) { return value; } else { LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); - ReplicationSourceShipper worker = createNewShipper(walGroupId); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, getStartOffset(walGroupId)); + ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader); Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); - worker.setWALReader(walReader); worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); return worker; } @@ -428,8 +427,9 @@ private long getFileSize(Path currentPath) throws IOException { return fileSize; } - protected ReplicationSourceShipper createNewShipper(String walGroupId) { - return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); + protected ReplicationSourceShipper createNewShipper(String walGroupId, + ReplicationSourceWALReader walReader) { + return new ReplicationSourceShipper(conf, walGroupId, this, walReader); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { @@ -665,7 +665,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics) { terminate(reason, cause, clearMetrics, true); } - public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { + private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { if (cause == null) { LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { @@ -684,9 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - if (worker.entryReader != null) { - worker.entryReader.setReaderRunning(false); - } + worker.entryReader.setReaderRunning(false); } if (this.replicationEndpoint != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 7b863dc35ae9..6d0730d76b6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -55,9 +55,8 @@ public enum WorkerState { } private final Configuration conf; - protected final String walGroupId; - protected final ReplicationSourceLogQueue logQueue; - protected final ReplicationSource source; + final String walGroupId; + private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile @@ -66,22 +65,22 @@ public enum WorkerState { private Path currentPath; // Current state of the worker thread private volatile WorkerState state; - protected ReplicationSourceWALReader entryReader; + final ReplicationSourceWALReader entryReader; // How long should we sleep for each retry - protected final long sleepForRetries; + private final long sleepForRetries; // Maximum number of retries before taking bold actions - protected final int maxRetriesMultiplier; + private final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; - public ReplicationSourceShipper(Configuration conf, String walGroupId, - ReplicationSourceLogQueue logQueue, ReplicationSource source) { + public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, + ReplicationSourceWALReader walReader) { this.conf = conf; this.walGroupId = walGroupId; - this.logQueue = logQueue; this.source = source; + this.entryReader = walReader; // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per @@ -295,10 +294,6 @@ long getCurrentPosition() { return currentPosition; } - void setWALReader(ReplicationSourceWALReader entryReader) { - this.entryReader = entryReader; - } - protected boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 707bab875d22..53996c376647 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -291,8 +291,7 @@ public void testTerminateClearsBuffer() throws Exception { mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); - ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); - shipper.entryReader = reader; + ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader); source.workerThreads.put("testPeer", shipper); WALEntryBatch batch = new WALEntryBatch(10, logDir); WAL.Entry mockEntry = mock(WAL.Entry.class);