Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HBASE-25539] Add age of oldest wal metric #2945

Merged
merged 9 commits into from
Feb 19, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ public void incrCompletedRecoveryQueue() {
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}

@Override
public void setOldestWalAge(long age) {
// Not implemented
}

@Override
public long getOldestWalAge() {
// Not implemented
return 0;
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal */
String OLDEST_WAL_AGE = "source.oldestWalAge";

void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
Expand Down Expand Up @@ -76,4 +78,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
long getWALEditsRead();
long getShippedOps();
long getEditsFiltered();
void setOldestWalAge(long age);
bharathv marked this conversation as resolved.
Show resolved Hide resolved
long getOldestWalAge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logReadInBytesKey;
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;

private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
Expand All @@ -65,6 +66,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;

public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
Expand Down Expand Up @@ -121,6 +123,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri

completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);

oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -183,6 +188,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(repeatedBytesKey);
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
}

@Override
Expand Down Expand Up @@ -248,6 +254,14 @@ public void incrCompletedRecoveryQueue() {
@Override
public void incrFailedRecoveryQueue() {/*no op*/}

@Override public void setOldestWalAge(long age) {
oldestWalAge.set(age);
}

@Override public long getOldestWalAge() {
return oldestWalAge.value();
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ public void incrFailedRecoveryQueue() {
globalSourceSource.incrFailedRecoveryQueue();
}

/*
Sets the age of oldest log file just for source.
*/
public void setOldestWalAge(long age) {
singleSourceSource.setOldestWalAge(age);
}

public long getOldestWalAge() {
return singleSourceSource.getOldestWalAge();
}

@Override
public void init() {
singleSourceSource.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
}

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
}

public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
public void locateRecoveredPaths(String walGroupId) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
pathsLoop: for (Path path : queue) {
Expand Down Expand Up @@ -116,9 +116,9 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
logQueue.clear(walGroupId);
for (Path path : newPaths) {
queue.add(path);
logQueue.enqueueLog(path, walGroupId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
Expand All @@ -40,9 +38,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
private final ReplicationQueueStorage replicationQueues;

public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationLogQueue logQueue, RecoveredReplicationSource source,
ReplicationQueueStorage queueStorage) {
super(conf, walGroupId, queue, source);
super(conf, walGroupId, logQueue, source);
this.source = source;
this.replicationQueues = queueStorage;
}
Expand All @@ -58,7 +56,7 @@ public long getStartPosition() {
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
source.locateRecoveredPaths(queue);
source.locateRecoveredPaths(walGroupId);
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
Expand All @@ -75,9 +73,9 @@ private long getRecoveredQueueStartPos() {
String peerClusterZNode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
peerClusterZNode, this.queue.peek().getName());
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
startPosition);
peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
LOG.trace("Recovered queue started with log {} at position {}",
this.logQueue.getQueue(walGroupId).peek(), startPosition);
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

License

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saintstack This is still in draft mode. Wanted to run tests first before I submit for review. Thank you !


import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
Class that does enqueueing/dequeueing of wal at one place so that we can update the metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dequeuing typo

just at one place.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReplicationLogQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class comment on what this thing does

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saintstack This is still in draft mode. Wanted to run tests first before I submit for review. Thank you !

// Queues of logs to process, entry in format of walGroupId->queue,
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
private MetricsSource metrics;
private Configuration conf;
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
// WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
private ReplicationSource source;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static finals to the top of the class.



public ReplicationLogQueue(Configuration conf, MetricsSource metrics, ReplicationSource source) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: thinking out loud, should we rename it to ReplicationSourceLogQueue to better convey that this is per source across all walGroups?

this.conf = conf;
this.metrics = metrics;
this.source = source;
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}

/**
* Enqueue the wal
* @param wal wal to be enqueued
* @param walGroupId Key for the wal in @queues map
* @return boolean whether this is the first time we are seeing this walGroupId.
*/
public boolean enqueueLog(Path wal, String walGroupId) {
boolean exists = false;
PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that this is not thread-safe. It wasn't before the patch too but this seems prone to weird concurrent modification issues. Fix while we are here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shahrs87 Missed this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed queues implementation from HashMap to ConcurrentHashMap. I thought your concern was regarding queues data structure. But reading the comment again, think you were concerned about PriorityBlockingQueue within map ?

if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(wal);
queues.put(walGroupId, queue);
} else {
exists = true;
queue.put(wal);
}
// Increment size of logQueue
this.metrics.incrSizeOfLogQueue();
// Compute oldest wal age
setOldestWalAge();
// This will wal a warning for each new wal that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
"replication.source.log.queue.warn {}", source.logPeerId(), walGroupId, queueSize,
logQueueWarnThreshold);
}
return exists;
}

/**
* Get the queue size for the given walGroupId.
* @param walGroupId walGroupId
*/
public int getQueueSize(String walGroupId) {
Queue queue = queues.get(walGroupId);
if (queue == null) {
return 0;
}
return queue.size();
}

/**
* Returns number of queues.
*/
public int getNumQueues() {
return queues.size();
}

public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return queues;
}

/**
* Return queue for the given walGroupId
* Please don't add or remove elements from the returned queue.
* Use @enqueueLog and @remove methods respectively.
* @param walGroupId walGroupId
*/
public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
return queues.get(walGroupId);
}

/**
* Remove head from the queue corresponding to given walGroupId.
* @param walGroupId walGroupId
*/
public void remove(String walGroupId) {
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
if (queue == null || queue.isEmpty()) {
return;
}
queue.remove();
// Decrease size logQueue.
metrics.decrSizeOfLogQueue();
// Re-compute age of oldest wal metric.
setOldestWalAge();
}

/**
* Remove all the elements from the queue corresponding to walGroupId
* @param walGroupId walGroupId
*/
public void clear(String walGroupId) {
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
while (!queue.isEmpty()) {
// Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
queue.remove();
metrics.decrSizeOfLogQueue();
}
setOldestWalAge();
}

private void setOldestWalAge() {
long now = EnvironmentEdgeManager.currentTime();
long timestamp = getOldestWalTimestamp();
// TODO: Should we handle the case where getOldestWalTimestamp returns Long.MAX_VALUE ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think we should? Otherwise on empty queue we get false alarms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT there would be atleast one source with active wal at the head of the queue always. In any case add code to handle Long.MAX_VALUE scenario.

long age = now - timestamp;
this.metrics.setOldestWalAge(age);
}

/*
Get the oldest wal timestamp from all the queues.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

private long getOldestWalTimestamp() {
long oldestWalTimestamp = Long.MAX_VALUE;
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an O(n) loop but should be ok because the no. of walGroups is typically in a few 100s max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is the reason I was ok with O(n) loop.

PriorityBlockingQueue<Path> queue = entry.getValue();
Path path = queue.peek();
// Can path ever be null ?
if (path != null) {
oldestWalTimestamp = Math.min(oldestWalTimestamp,
AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
}
}
return oldestWalTimestamp;
}
}
Loading