Skip to content

Commit

Permalink
[HBASE-25539] Add age of oldest wal metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Rushabh committed Feb 18, 2021
1 parent 2d26c94 commit 484df88
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal in ms since its creation time */
String OLDEST_WAL_AGE = "source.oldestWalAge";

void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
Expand All @@ -74,5 +76,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
void incrFailedRecoveryQueue();

void setOldestWalAge(long age);
long getOldestWalAge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ public void incrCompletedRecoveryQueue() {
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}

@Override
public void setOldestWalAge(long age) {
// Not implemented
}

@Override
public long getOldestWalAge() {
// Not implemented
return 0;
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logReadInBytesKey;
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;

private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;

public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
Expand Down Expand Up @@ -126,6 +128,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri

completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);

oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -191,6 +196,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(repeatedBytesKey);
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
}

@Override
Expand Down Expand Up @@ -256,6 +262,14 @@ public void incrCompletedRecoveryQueue() {
@Override
public void incrFailedRecoveryQueue() {/*no op*/}

@Override public void setOldestWalAge(long age) {
oldestWalAge.set(age);
}

@Override public long getOldestWalAge() {
return oldestWalAge.value();
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,17 @@ public void updateHistogram(String name, long value) {
globalSourceSource.updateHistogram(name, value);
}

/*
Sets the age of oldest log file just for source.
*/
public void setOldestWalAge(long age) {
singleSourceSource.setOldestWalAge(age);
}

public long getOldestWalAge() {
return singleSourceSource.getOldestWalAge();
}

@Override
public String getMetricsContext() {
return globalSourceSource.getMetricsContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -89,10 +88,7 @@
public class ReplicationSource extends Thread implements ReplicationSourceInterface {

private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues =
new HashMap<String, PriorityBlockingQueue<Path>>();
protected ReplicationSourceLogQueue logQueue;
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
private ReplicationQueues replicationQueues;
Expand Down Expand Up @@ -125,8 +121,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private volatile boolean sourceRunning = false;
// Metrics for this source
private MetricsSource metrics;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
Expand Down Expand Up @@ -175,6 +169,7 @@ public void init(final Configuration conf, final FileSystem fs,
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
Expand All @@ -186,7 +181,6 @@ public void init(final Configuration conf, final FileSystem fs,
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;

defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
Expand All @@ -207,16 +201,14 @@ private void decorateConf() {
@Override
public void enqueueLog(Path log) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
boolean queueExists = logQueue.enqueueLog(log, logPrefix);
if (!queueExists) {
if (this.sourceRunning) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
new ReplicationSourceShipperThread(logPrefix, logQueue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
Expand All @@ -226,14 +218,6 @@ public void enqueueLog(Path log) {
}
}
}
queue.put(log);
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
+ " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
}
}

@Override
Expand Down Expand Up @@ -325,11 +309,11 @@ public void run() {
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : logQueue.getQueues().entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
new ReplicationSourceShipperThread(walGroupId, logQueue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
Expand Down Expand Up @@ -480,7 +464,7 @@ public int compare(Path o1, Path o2) {
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
public static long getTS(Path p) {
int tsIndex = p.getName().lastIndexOf('.') + 1;
return Long.parseLong(p.getName().substring(tsIndex));
}
Expand Down Expand Up @@ -527,7 +511,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
String walGroupId = worker.getWalGroupId();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
int queueSize = logQueue.getQueueSize(walGroupId);
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = worker.getLastLoggedPath();
Expand Down Expand Up @@ -563,7 +547,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
public class ReplicationSourceShipperThread extends Thread {
ReplicationSourceInterface source;
String walGroupId;
PriorityBlockingQueue<Path> queue;
ReplicationSourceLogQueue logQueue;
ReplicationQueueInfo replicationQueueInfo;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
Expand All @@ -574,10 +558,10 @@ public class ReplicationSourceShipperThread extends Thread {
ReplicationSourceWALReaderThread entryReader;

public ReplicationSourceShipperThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSourceLogQueue logQueue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSourceInterface source) {
this.walGroupId = walGroupId;
this.queue = queue;
this.logQueue = logQueue;
this.replicationQueueInfo = replicationQueueInfo;
this.source = source;
}
Expand Down Expand Up @@ -835,11 +819,11 @@ public void uncaughtException(final Thread t, final Throwable e) {
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos(long startPosition) {
try {
startPosition =
(replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
startPosition = (replicationQueues.getLogPosition(peerClusterZnode,
this.logQueue.getQueue(walGroupId).peek().getName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ startPosition);
LOG.trace("Recovered queue started with log " +
this.logQueue.getQueue(walGroupId).peek() + " at position " + startPosition);
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
Expand All @@ -853,8 +837,9 @@ private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHan
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, logQueue,
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this,
this.walGroupId);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);
Expand All @@ -866,6 +851,7 @@ private void locateRecoveredPaths() throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
pathsLoop: for (Path path : queue) {
if (fs.exists(path)) { // still in same location, don't need to do anything
newPaths.add(path);
Expand Down Expand Up @@ -915,9 +901,9 @@ private void locateRecoveredPaths() throws IOException {
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
logQueue.clear(walGroupId);
for (Path path : newPaths) {
queue.add(path);
logQueue.enqueueLog(path, walGroupId);
}
}
}
Expand Down
Loading

0 comments on commit 484df88

Please sign in to comment.