diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 69671c13b611..f78eba3140cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -272,10 +272,14 @@ public void incrLogReadInBytes(long readInBytes) { /** Removes all metrics about this Source. */ public void clear() { + terminate(); + singleSourceSource.clear(); + } + + public void terminate() { int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.decrSizeOfLogQueue(lastQueueSize); - singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); lastShippedTimeStamps.clear(); lastHFileRefsQueueSize = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a827a2555fb0..1942a014ba3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -719,10 +719,13 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool } } } - if (clearMetrics) { - // Can be null in test context. - if (this.metrics != null) { + + // Can be null in test context. + if (this.metrics != null) { + if (clearMetrics) { this.metrics.clear(); + } else { + this.metrics.terminate(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ba48207a0881..3f88838b7f11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -370,7 +370,8 @@ public void refreshSources(String peerId) throws IOException { ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - toRemove.terminate(terminateMessage, null, true); + // Do not clear metrics + toRemove.terminate(terminateMessage, null, false); } src = createSource(peerId, peer); this.sources.put(peerId, src); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 0ec2b0a8be8a..ac553de178d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -92,6 +92,8 @@ public void terminate(String reason, Exception e) { public void terminate(String reason, Exception e, boolean clearMetrics) { if (clearMetrics) { this.metrics.clear(); + } else { + this.metrics.terminate(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 550159eeb7f4..268aa65ab18c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -554,6 +554,41 @@ public void testRemovePeerMetricsCleanup() throws Exception { } } + @Test + public void testDisablePeerMetricsCleanup() throws Exception { + final String peerId = "DummyPeer"; + final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); + try { + MetricsReplicationSourceSource globalSource = getGlobalSource(); + final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); + final long sizeOfLatestPath = getSizeOfLatestPath(); + addPeerAndWait(peerId, peerConfig, true); + assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + ReplicationSourceInterface source = manager.getSource(peerId); + // Sanity check + assertNotNull(source); + final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); + // Enqueue log and check if metrics updated + source.enqueueLog(new Path("abc")); + assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); + + // Refreshing the peer should decrement the global and single source metrics + manager.refreshSources(peerId); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + source = manager.getSource(peerId); + assertNotNull(source); + assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); + } finally { + removePeerAndWait(peerId); + } + } + /** * Add a peer and wait for it to initialize * @param waitForSource Whether to wait for replication source to initialize