diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java new file mode 100644 index 000000000000..4abbc7ae38d1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * The Class MetricsReplicationSourceRefresherChore for refreshing age related replication source + * metrics + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MetricsReplicationSourceRefresherChore extends ScheduledChore { + + private ReplicationSource replicationSource; + + private MetricsSource metrics; + + public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; + public static final int DEFAULT_DURATION_MILLISECONDS = 60000; + + public MetricsReplicationSourceRefresherChore(Stoppable stopper, + ReplicationSource replicationSource) { + this(DEFAULT_DURATION_MILLISECONDS, stopper, replicationSource); + } + + public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, + ReplicationSource replicationSource) { + super("MetricsSourceRefresherChore", stopper, duration); + this.replicationSource = replicationSource; + this.metrics = this.replicationSource.getSourceMetrics(); + } + + @Override + protected void chore() { + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /* + * Returns the age of oldest wal. + */ + long getOldestWalAge() { + long now = EnvironmentEdgeManager.currentTime(); + long timestamp = getOldestWalTimestamp(); + if (timestamp == Long.MAX_VALUE) { + // If there are no wals in the queue then set the oldest wal timestamp to current time + // so that the oldest wal age will be 0. + timestamp = now; + } + long age = now - timestamp; + return age; + } + + /* + * Get the oldest wal timestamp from all the queues. + */ + private long getOldestWalTimestamp() { + long oldestWalTimestamp = Long.MAX_VALUE; + for (Map.Entry> entry : this.replicationSource.getQueues() + .entrySet()) { + PriorityBlockingQueue queue = entry.getValue(); + Path path = queue.peek(); + // Can path ever be null ? + if (path != null) { + oldestWalTimestamp = + Math.min(oldestWalTimestamp, AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); + } + } + return oldestWalTimestamp; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d1268fab94cf..25e6f564612b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -224,6 +224,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); + int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION, + MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS); + this.server.getChoreService() + .scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this)); LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java index 4d89edef5fdc..5a6ffbfa70ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java @@ -23,7 +23,6 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -31,8 +30,7 @@ import org.slf4j.LoggerFactory; /* - Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics - just at one place. + Class that does enqueueing/dequeuing of wal at one place */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -80,8 +78,6 @@ public boolean enqueueLog(Path wal, String walGroupId) { } // Increment size of logQueue this.metrics.incrSizeOfLogQueue(); - // Compute oldest wal age - this.metrics.setOldestWalAge(getOldestWalAge()); // This will wal a warning for each new wal that gets created above the warn threshold int queueSize = queue.size(); if (queueSize > this.logQueueWarnThreshold) { @@ -137,8 +133,6 @@ public void remove(String walGroupId) { queue.remove(); // Decrease size logQueue. this.metrics.decrSizeOfLogQueue(); - // Re-compute age of oldest wal metric. - this.metrics.setOldestWalAge(getOldestWalAge()); } /** @@ -152,39 +146,6 @@ public void clear(String walGroupId) { queue.remove(); metrics.decrSizeOfLogQueue(); } - this.metrics.setOldestWalAge(getOldestWalAge()); - } - - /* - Returns the age of oldest wal. - */ - long getOldestWalAge() { - long now = EnvironmentEdgeManager.currentTime(); - long timestamp = getOldestWalTimestamp(); - if (timestamp == Long.MAX_VALUE) { - // If there are no wals in the queue then set the oldest wal timestamp to current time - // so that the oldest wal age will be 0. - timestamp = now; - } - long age = now - timestamp; - return age; - } - - /* - Get the oldest wal timestamp from all the queues. - */ - private long getOldestWalTimestamp() { - long oldestWalTimestamp = Long.MAX_VALUE; - for (Map.Entry> entry : queues.entrySet()) { - PriorityBlockingQueue queue = entry.getValue(); - Path path = queue.peek(); - // Can path ever be null ? - if (path != null) { - oldestWalTimestamp = Math.min(oldestWalTimestamp, - AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); - } - } - return oldestWalTimestamp; } public MetricsSource getMetrics() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index a6362f99d3a9..e90d5e6d264e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -162,7 +162,7 @@ public Connection getConnection() { @Override public ChoreService getChoreService() { - return null; + return new ChoreService(getClass().getSimpleName()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index f0a85b580d4b..d4ddc6140784 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -251,7 +251,7 @@ public ExecutorService getExecutorService() { @Override public ChoreService getChoreService() { - return null; + return new ChoreService(getClass().getSimpleName()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java new file mode 100644 index 000000000000..c510e7b397c2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ SmallTests.class }) +public class TestMetricsReplicationSourceRefresherChore { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMetricsReplicationSourceRefresherChore.class); + + @Test + public void testOldestWalAgeMetricsRefresherCore() { + try { + HRegionServer rs = mock(HRegionServer.class); + ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); + MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); + Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsReplicationSourceRefresherChore mrsrChore = + new MetricsReplicationSourceRefresherChore(10, rs, mockSource); + + manualEdge.setValue(10); + final Path log1 = new Path("log-walgroup-a.8"); + String walGroupId1 = "fake-walgroup-id-1"; + Map> queues = new ConcurrentHashMap<>(); + PriorityBlockingQueue queue = + new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); + queue.put(log1); + queues.put(walGroupId1, queue); + Mockito.when(mockSource.getQueues()).thenReturn(queues); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(2); + + manualEdge.setValue(20); + final Path log2 = new Path("log-walgroup-b.8"); + String walGroupId2 = "fake-walgroup-id-2"; + queue.put(log2); + queues.put(walGroupId2, queue); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(12); + } finally { + EnvironmentEdgeManager.reset(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index f5d4f7782947..6759ec771aac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -265,7 +265,9 @@ public void testTerminateTimeout() throws Exception { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + source.init(testConf, null, manager, null, mockPeer, rss, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit( @@ -289,7 +291,9 @@ public void testTerminateClearsBuffer() throws Exception { ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, mockManager, null, mockPeer, null, + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + source.init(testConf, null, mockManager, null, mockPeer, rss, "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); @@ -639,6 +643,7 @@ public void testAgeOfOldestWal() throws Exception { String id = "1"; MetricsSource metrics = new MetricsSource(id); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(MetricsReplicationSourceRefresherChore.DURATION, 1000); conf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); @@ -653,7 +658,6 @@ public void testAgeOfOldestWal() throws Exception { thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - ReplicationSource source = new ReplicationSource(); source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(), metrics); @@ -662,12 +666,16 @@ public void testAgeOfOldestWal() throws Exception { manualEdge.setValue(10); // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. source.enqueueLog(log1); + // Sleep for chore to update WAL age + Thread.sleep(1000); MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); assertEquals(2, metricsSource1.getOldestWalAge()); final Path log2 = new Path(logDir, "log-walgroup-b.4"); // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 source.enqueueLog(log2); + // Sleep for chore to update WAL age + Thread.sleep(1000); assertEquals(6, metricsSource1.getOldestWalAge()); // Clear all metrics. metrics.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java index c28b18003c5b..9724f1d03907 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -25,8 +25,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -40,44 +38,34 @@ public class TestReplicationSourceLogQueue { HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); /* - Testing enqueue and dequeuing of wal and check age of oldest wal. + Testing enqueue and dequeuing of wal */ @Test public void testEnqueueDequeue() { - try { - String walGroupId1 = "fake-walgroup-id-1"; - String walGroupId2 = "fake-walgroup-id-2"; + String walGroupId1 = "fake-walgroup-id-1"; + String walGroupId2 = "fake-walgroup-id-2"; - ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsSource metrics = new MetricsSource("1"); + Configuration conf = HBaseConfiguration.create(); + ReplicationSource source = mock(ReplicationSource.class); + Mockito.doReturn("peer").when(source).logPeerId(); + ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source); + final Path log1 = new Path("log-walgroup-a.8"); - MetricsSource metrics = new MetricsSource("1"); - Configuration conf = HBaseConfiguration.create(); - ReplicationSource source = mock(ReplicationSource.class); - Mockito.doReturn("peer").when(source).logPeerId(); - ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source); - final Path log1 = new Path("log-walgroup-a.8"); - manualEdge.setValue(10); - // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. - logQueue.enqueueLog(log1, walGroupId1); - assertEquals(2, logQueue.getOldestWalAge()); + logQueue.enqueueLog(log1, walGroupId1); + assertEquals(1, logQueue.getQueue(walGroupId1).size()); + final Path log2 = new Path("log-walgroup-b.4"); + logQueue.enqueueLog(log2, walGroupId2); + assertEquals(1, logQueue.getQueue(walGroupId2).size()); + assertEquals(2, logQueue.getNumQueues()); - final Path log2 = new Path("log-walgroup-b.4"); - // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 - logQueue.enqueueLog(log2, walGroupId2); - assertEquals(6, logQueue.getOldestWalAge()); - - // Remove an element from walGroupId2. - // After this op, there will be only one element in the queue log-walgroup-a.8 - logQueue.remove(walGroupId2); - assertEquals(2, logQueue.getOldestWalAge()); - - // Remove last element from the queue. - logQueue.remove(walGroupId1); - // This will test the case where there are no elements in the queue. - assertEquals(0, logQueue.getOldestWalAge()); - } finally { - EnvironmentEdgeManager.reset(); - } + // Remove an element from walGroupId2. + // After this op, there will be only one element in the queue log-walgroup-a.8 + logQueue.remove(walGroupId2); + assertEquals(0, logQueue.getQueue(walGroupId2).size()); + // Remove last element from the queue. + logQueue.remove(walGroupId1); + assertEquals(0, logQueue.getQueue(walGroupId1).size()); + // This will test the case where there are no elements in the queue. } }