Skip to content

Commit

Permalink
HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from Re…
Browse files Browse the repository at this point in the history
…plicationSourceManager (#2019)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
infraio committed Jul 21, 2020
1 parent bb9cae1 commit fae9f0c
Showing 1 changed file with 62 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,6 @@
* <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
* is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
* operations.</li>
* <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
* {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
* {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
* {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
* is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
* remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
* case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
* will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
* {@link ReplicationSourceInterface}. So there is no race here. For
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
* is already synchronized on {@link #oldsources}. So no need synchronized on
* {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
Expand All @@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping
private final Server server;

// All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs
// For normal replication source, the peer id is same with the queue id
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs
// For recovered source, the queue id's format is peer_id-servername-*
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;

private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;

private final Configuration conf;
Expand Down Expand Up @@ -195,8 +162,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
this.walsById = new ConcurrentHashMap<>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
this.oldsources = new ArrayList<>();
this.conf = conf;
this.fs = fs;
Expand Down Expand Up @@ -331,7 +296,6 @@ public void removePeer(String peerId) {
// Delete queue from storage and memory and queue id is same with peer id for normal
// source
deleteQueue(peerId);
this.walsById.remove(peerId);
}
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
if (peerConfig.isSyncReplication()) {
Expand Down Expand Up @@ -372,15 +336,10 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
// synchronized on latestPaths to avoid missing the new log
synchronized (this.latestPaths) {
this.sources.put(peerId, src);
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
if (!latestPaths.isEmpty()) {
for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
Path walPath = walPrefixAndPath.getValue();
NavigableSet<String> wals = new TreeSet<>();
wals.add(walPath.getName());
walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
Expand Down Expand Up @@ -434,7 +393,10 @@ public void drainSources(String peerId) throws IOException, ReplicationException
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
});
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
Expand All @@ -443,15 +405,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
queueStorage.removeWAL(server.getServerName(), peerId, wal);
}
}
synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
wals.forEach((k, v) -> {
NavigableSet<String> walsByGroup = oldWals.get(k);
if (walsByGroup != null) {
walsByGroup.removeAll(v);
}
});
}
// synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
// a background task, we will delete the file from replication queue storage under the lock to
// simplify the logic.
Expand All @@ -463,7 +416,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
oldSource.terminate(terminateMessage);
oldSource.getSourceMetrics().clear();
queueStorage.removeQueue(server.getServerName(), queueId);
walsByIdRecoveredQueues.remove(queueId);
iter.remove();
}
}
Expand All @@ -476,7 +428,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
public void refreshSources(String peerId) throws IOException {
public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId +
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
Expand All @@ -489,9 +441,8 @@ public void refreshSources(String peerId) throws IOException {
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
Expand All @@ -512,9 +463,8 @@ public void refreshSources(String peerId) throws IOException {
for (String queueId : previousQueueIds) {
ReplicationSourceInterface replicationSource = createSource(queueId, peer);
this.oldsources.add(replicationSource);
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
}
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
.forEach(wal -> src.enqueueLog(new Path(wal)));
toStartup.add(replicationSource);
}
}
Expand All @@ -534,7 +484,6 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue {}", src.getQueueId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsByIdRecoveredQueues.remove(src.getQueueId());
return true;
}

Expand All @@ -557,8 +506,6 @@ void removeSource(ReplicationSourceInterface src) {
this.sources.remove(src.getPeerId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsById.remove(src.getQueueId());

}

/**
Expand Down Expand Up @@ -644,42 +591,19 @@ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
* @param source the replication source
*/
@VisibleForTesting
void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
cleanOldLogs(walsToRemove, source);
walsToRemove.clear();
}
} else {
NavigableSet<String> wals;
NavigableSet<String> walsToRemove;
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals == null) {
return;
}
walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
walsToRemove = new TreeSet<>(walsToRemove);
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
// now let's remove the files in the set
synchronized (this.walsById) {
wals.removeAll(walsToRemove);
}
void cleanOldLogs(String log, boolean inclusive,
ReplicationSourceInterface source) {
NavigableSet<String> walsToRemove;
synchronized (this.latestPaths) {
walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
}
if (walsToRemove.isEmpty()) {
return;
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
}

private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
Expand Down Expand Up @@ -760,37 +684,6 @@ public void preLogRoll(Path newLog) throws IOException {
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
}

// synchronized on walsById to avoid race with cleanOldLogs
synchronized (this.walsById) {
// Update walsById map
for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
.entrySet()) {
String peerId = entry.getKey();
Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since
// we only consider the last one when a new slave comes in
wals.clear();
}
if (logPrefix.equals(walsEntry.getKey())) {
wals.add(logName);
existingPrefix = true;
}
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
NavigableSet<String> wals = new TreeSet<>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
}
}

// Add to latestPaths
latestPaths.put(logPrefix, newLog);
}
Expand Down Expand Up @@ -962,18 +855,6 @@ public void run() {
continue;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
LOG.trace("Added source for recovered queue: " + src.getQueueId());
for (String wal : walsSet) {
Expand Down Expand Up @@ -1005,7 +886,18 @@ public void join() {
* @return a sorted set of wal names
*/
@VisibleForTesting
public Map<String, Map<String, NavigableSet<String>>> getWALs() {
public Map<String, Map<String, NavigableSet<String>>> getWALs()
throws ReplicationException {
Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
for (ReplicationSourceInterface source : sources.values()) {
String queueId = source.getQueueId();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsById.put(queueId, walsByGroup);
for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
}
}
return Collections.unmodifiableMap(walsById);
}

Expand All @@ -1014,7 +906,18 @@ public Map<String, Map<String, NavigableSet<String>>> getWALs() {
* @return a sorted set of wal names
*/
@VisibleForTesting
Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
throws ReplicationException {
Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>();
for (ReplicationSourceInterface source : oldsources) {
String queueId = source.getQueueId();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
}
}
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}

Expand Down Expand Up @@ -1177,4 +1080,21 @@ public void cleanUpHFileRefs(String peerId, List<String> files) {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}

private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
NavigableSet<String> walsToRemove = new TreeSet<>();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
try {
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
if (walPrefix.equals(logPrefix)) {
walsToRemove.add(wal);
}
});
} catch (ReplicationException e) {
// Just log the exception here, as the recovered replication source will try to cleanup again.
LOG.warn("Failed to read wals in queue {}", queueId, e);
}
return walsToRemove.headSet(log, inclusive);
}
}

0 comments on commit fae9f0c

Please sign in to comment.