Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25881: Create a chore to update age related metrics #3518

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* The Class MetricsReplicationSourceRefresherChore for refreshing age related replication source
* metrics
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MetricsReplicationSourceRefresherChore extends ScheduledChore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicationSourceMetricsRefresherChore a better name?


private ReplicationSource replicationSource;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra lines.

private MetricsSource metrics;

public static final String DURATION = "hbase.metrics.replication.source.refresher.duration";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also..
better name would be hbase.replication.source.metrics.refresher.duration
Or even
hbase.replication.source.metrics.refresher.period ?

public static final int DEFAULT_DURATION_MILLISECONDS = 60000;

public MetricsReplicationSourceRefresherChore(Stoppable stopper,
ReplicationSource replicationSource) {
this(DEFAULT_DURATION_MILLISECONDS, stopper, replicationSource);
}

public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper,
ReplicationSource replicationSource) {
super("MetricsSourceRefresherChore", stopper, duration);
this.replicationSource = replicationSource;
this.metrics = this.replicationSource.getSourceMetrics();
}

@Override
protected void chore() {
this.metrics.setOldestWalAge(getOldestWalAge());
}

/*
* Returns the age of oldest wal.
*/
long getOldestWalAge() {
Copy link
Contributor

@shahrs87 shahrs87 Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahulLiving There are many age related metrics in Replication subsytem. We want to update all the metrics from this Chore. I don't think it is a good idea to move the implementation from ReplicationSourceLogQueue to here.
What I had in my mind is: Compute oldestWalTimestamp (not the age, just the timestamp) in ReplicationSourceLogQueue and keep it as a class variable. Don't calculate the age there. Instead calculate the age in this chore. So for some reason if Replication thread is stuck, it will not update oldestWalTimestamp and since age is calculated in this chore, it will be updated. Does that make sense ?

long now = EnvironmentEdgeManager.currentTime();
long timestamp = getOldestWalTimestamp();
if (timestamp == Long.MAX_VALUE) {
// If there are no wals in the queue then set the oldest wal timestamp to current time
// so that the oldest wal age will be 0.
timestamp = now;
}
long age = now - timestamp;
return age;
}

/*
* Get the oldest wal timestamp from all the queues.
*/
private long getOldestWalTimestamp() {
long oldestWalTimestamp = Long.MAX_VALUE;
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : this.replicationSource.getQueues()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterate over valueSet

.entrySet()) {
PriorityBlockingQueue<Path> queue = entry.getValue();
Path path = queue.peek();
// Can path ever be null ?
if (path != null) {
oldestWalTimestamp =
Math.min(oldestWalTimestamp, AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
}
}
return oldestWalTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
true);

int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we creating a chore for every replication source ? If we have 2-3 peers, this will create that many chores. Instead is it possible to create just 1 chore and initialize with ReplicationSourceManager which has list of all replicationSources and just iterate over them every time this chore wakes up. Thoughts ?

MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS);
this.server.getChoreService()
.scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this));
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
just at one place.
Class that does enqueueing/dequeuing of wal at one place
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand Down Expand Up @@ -80,8 +78,6 @@ public boolean enqueueLog(Path wal, String walGroupId) {
}
// Increment size of logQueue
this.metrics.incrSizeOfLogQueue();
// Compute oldest wal age
this.metrics.setOldestWalAge(getOldestWalAge());
// This will wal a warning for each new wal that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
Expand Down Expand Up @@ -137,8 +133,6 @@ public void remove(String walGroupId) {
queue.remove();
// Decrease size logQueue.
this.metrics.decrSizeOfLogQueue();
// Re-compute age of oldest wal metric.
this.metrics.setOldestWalAge(getOldestWalAge());
}

/**
Expand All @@ -152,39 +146,6 @@ public void clear(String walGroupId) {
queue.remove();
metrics.decrSizeOfLogQueue();
}
this.metrics.setOldestWalAge(getOldestWalAge());
}

/*
Returns the age of oldest wal.
*/
long getOldestWalAge() {
long now = EnvironmentEdgeManager.currentTime();
long timestamp = getOldestWalTimestamp();
if (timestamp == Long.MAX_VALUE) {
// If there are no wals in the queue then set the oldest wal timestamp to current time
// so that the oldest wal age will be 0.
timestamp = now;
}
long age = now - timestamp;
return age;
}

/*
Get the oldest wal timestamp from all the queues.
*/
private long getOldestWalTimestamp() {
long oldestWalTimestamp = Long.MAX_VALUE;
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
PriorityBlockingQueue<Path> queue = entry.getValue();
Path path = queue.peek();
// Can path ever be null ?
if (path != null) {
oldestWalTimestamp = Math.min(oldestWalTimestamp,
AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
}
}
return oldestWalTimestamp;
}

public MetricsSource getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Connection getConnection() {

@Override
public ChoreService getChoreService() {
return null;
return new ChoreService(getClass().getSimpleName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public ExecutorService getExecutorService() {

@Override
public ChoreService getChoreService() {
return null;
return new ChoreService(getClass().getSimpleName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category({ SmallTests.class })
public class TestMetricsReplicationSourceRefresherChore {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetricsReplicationSourceRefresherChore.class);

@Test
public void testOldestWalAgeMetricsRefresherCore() {
try {
HRegionServer rs = mock(HRegionServer.class);
ReplicationSource mockSource = Mockito.mock(ReplicationSource.class);
MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class);
Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource);
ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(manualEdge);
MetricsReplicationSourceRefresherChore mrsrChore =
new MetricsReplicationSourceRefresherChore(10, rs, mockSource);

manualEdge.setValue(10);
final Path log1 = new Path("log-walgroup-a.8");
String walGroupId1 = "fake-walgroup-id-1";
Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
PriorityBlockingQueue<Path> queue =
new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator());
queue.put(log1);
queues.put(walGroupId1, queue);
Mockito.when(mockSource.getQueues()).thenReturn(queues);
mrsrChore.chore();
verify(mockMetricsSource, times(1)).setOldestWalAge(2);

manualEdge.setValue(20);
final Path log2 = new Path("log-walgroup-b.8");
String walGroupId2 = "fake-walgroup-id-2";
queue.put(log2);
queues.put(walGroupId2, queue);
mrsrChore.chore();
verify(mockMetricsSource, times(1)).setOldestWalAge(12);
} finally {
EnvironmentEdgeManager.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ public void testTerminateTimeout() throws Exception {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
source.init(testConf, null, manager, null, mockPeer, rss, "testPeer",
null, p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
Expand All @@ -289,7 +291,9 @@ public void testTerminateClearsBuffer() throws Exception {
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
source.init(testConf, null, mockManager, null, mockPeer, rss,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source, null);
Expand Down Expand Up @@ -639,6 +643,7 @@ public void testAgeOfOldestWal() throws Exception {
String id = "1";
MetricsSource metrics = new MetricsSource(id);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(MetricsReplicationSourceRefresherChore.DURATION, 1000);
conf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Expand All @@ -653,7 +658,6 @@ public void testAgeOfOldestWal() throws Exception {
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));

ReplicationSource source = new ReplicationSource();
source.init(conf, null, manager, null, mockPeer, rss, id, null,
p -> OptionalLong.empty(), metrics);
Expand All @@ -662,12 +666,16 @@ public void testAgeOfOldestWal() throws Exception {
manualEdge.setValue(10);
// Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
source.enqueueLog(log1);
// Sleep for chore to update WAL age
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Waiter.waitFor method to wait on some condition. That way if the jenkins machine are slow, it will not create flaky tests.

MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
assertEquals(2, metricsSource1.getOldestWalAge());

final Path log2 = new Path(logDir, "log-walgroup-b.4");
// Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
source.enqueueLog(log2);
// Sleep for chore to update WAL age
Thread.sleep(1000);
assertEquals(6, metricsSource1.getOldestWalAge());
// Clear all metrics.
metrics.clear();
Expand Down
Loading