Skip to content

Commit

Permalink
HBASE-25583: Handle the running replication source gracefully with re…
Browse files Browse the repository at this point in the history
…plication nodes deleted (#2960)

Signed-off-by: Wellington Chevreuil <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
sandeepvinayak authored Feb 19, 2021
1 parent 2d26c94 commit f9a9148
Show file tree
Hide file tree
Showing 14 changed files with 594 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface ReplicationQueues {
* @param queueId a String that identifies the queue.
* @param filename name of the WAL
*/
void removeLog(String queueId, String filename);
void removeLog(String queueId, String filename) throws ReplicationSourceWithoutPeerException;

/**
* Set the current position for a specific WAL in a given queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

/**
Expand Down Expand Up @@ -132,17 +132,44 @@ public void addLog(String queueId, String filename) throws ReplicationException
}

@Override
public void removeLog(String queueId, String filename) {
public void removeLog(String queueId, String filename)
throws ReplicationSourceWithoutPeerException {
try {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
+ filename + ")", e);
try {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException.NoNodeException e) {
// in case of no node exception we should not crash the region server
// but instead check if the replication peer has been removed.
// If so, we can throw here so that the source can terminate itself.
// This situation can occur when the replication peer znodes has been
// removed but the sources not terminated due to any miss from zk node delete watcher.
if (!doesPeerExist(queueId)) {
LOG.warn("Replication peer " + queueId + " has been removed", e);
throw new ReplicationSourceWithoutPeerException(
"Znodes for peer has been delete while a source is still active", e);
} else {
throw e;
}
}
} catch (KeeperException ke) {
this.abortable.abort(
"Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", ke);
}
}

private boolean doesPeerExist(String queueId) throws KeeperException {
String peerId = queueId;
if (peerId.contains("-")) {
// queueId will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = queueId.split("-")[0];
}

return peerExists(peerId);
}

@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
Expand Down Expand Up @@ -426,8 +453,9 @@ private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));

if (LOG.isTraceEnabled())
if (LOG.isTraceEnabled()) {
LOG.trace(" The multi list size is: " + listOfOps.size());
}
}
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

Expand Down Expand Up @@ -506,7 +534,7 @@ private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, Stri
}

/**
* @param lockOwner
* @param lockOwner lock owner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
* for use as content of an replication lock during region server fail over.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* This exception is thrown when the replication source is running with no
* corresponding peer. This can due to race condition between PeersWatcher
* zk listerner and source trying to remove the queues, or if zk listener for
* delete node was never invoked for any reason. See HBASE-25583
*/
@InterfaceAudience.Private
public class ReplicationSourceWithoutPeerException extends ReplicationException {
private static final long serialVersionUID = 1L;

public ReplicationSourceWithoutPeerException(String m, Throwable t) {
super(m, t);
}

public ReplicationSourceWithoutPeerException(String m) {
super(m);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
Expand Down Expand Up @@ -158,7 +159,7 @@ public enum WorkerState {
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
* @throws IOException
* @throws IOException IO Exception
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
Expand Down Expand Up @@ -441,7 +442,9 @@ public String getPeerClusterId() {
@Override
public Path getCurrentPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
if (worker.getCurrentPath() != null) {
return worker.getCurrentPath();
}
}
return null;
}
Expand All @@ -460,7 +463,7 @@ public long getLastLoggedPosition() {
return 0;
}

private boolean isSourceActive() {
public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}

Expand Down Expand Up @@ -792,9 +795,13 @@ protected void shipEdits(WALEntryBatch entryBatch) {
}

private void updateLogPosition(long lastReadPosition) {
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
try {
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
} catch (ReplicationSourceWithoutPeerException re) {
source.terminate("Replication peer is removed and source should terminate", re);
}
}

public void startup() {
Expand Down Expand Up @@ -976,7 +983,7 @@ private void terminate(String reason, Exception cause) {

/**
* Set the worker state
* @param state
* @param state the state of the wal reader
*/
public void setWorkerState(WorkerState state) {
this.state = state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
Expand Down Expand Up @@ -125,14 +126,14 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param replicationPeers the replication peers maintenance class
* @param replicationTracker the replication tracker to track the states
* @param conf the configuration to use
* @param server the server for this region server
* @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived
* @param clusterId
* @param clusterId the cluster id of the source cluster
*/
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
Expand Down Expand Up @@ -181,14 +182,14 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
* wal it belongs to and will log, for this region server, the current
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries.
* replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
Expand All @@ -204,7 +205,8 @@ public synchronized void logPositionAndCleanOldLogs(Path log, String id, long po
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
public void cleanOldLogs(String key, String id, boolean queueRecovered)
throws ReplicationSourceWithoutPeerException {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
Expand All @@ -222,9 +224,10 @@ public void cleanOldLogs(String key, String id, boolean queueRecovered) {
}
}
}
}
}

private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
private void cleanOldLogs(SortedSet<String> wals, String key, String id)
throws ReplicationSourceWithoutPeerException {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
for (String wal : walSet) {
Expand Down Expand Up @@ -267,7 +270,7 @@ protected void init() throws IOException, ReplicationException {
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
* @throws IOException
* @throws IOException IO Exception
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
Expand Down Expand Up @@ -365,7 +368,7 @@ public List<ReplicationSourceInterface> getOldSources() {

/**
* Get the normal source for a given peer
* @param peerId
* @param peerId the replication peer Id
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
Expand Down Expand Up @@ -402,7 +405,7 @@ void preLogRoll(Path newLog) throws IOException {
* Check and enqueue the given log to the correct source. If there's still no source for the
* group to which the given log belongs, create one
* @param logPath the log path to check and enqueue
* @throws IOException
* @throws IOException IO Exception
*/
private void recordLog(Path logPath) throws IOException {
String logName = logPath.getName();
Expand Down Expand Up @@ -467,7 +470,7 @@ public AtomicLong getTotalBufferUsed() {
* @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
* @throws IOException IO Exception
*/
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
Expand Down Expand Up @@ -523,7 +526,8 @@ protected ReplicationSourceInterface getReplicationSource(final Configuration co
clusterId, replicationEndpoint, metrics);

// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
replicationEndpoint.init(new ReplicationEndpoint.Context(
conf, replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));

return src;
Expand All @@ -535,7 +539,7 @@ protected ReplicationSourceInterface getReplicationSource(final Configuration co
* znodes and finally will delete the old znodes.
*
* It creates one old source for any type of source of the old rs.
* @param rsZnode
* @param rsZnode znode for region server from where to transfer the queues
*/
private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer =
Expand Down Expand Up @@ -664,7 +668,7 @@ class NodeFailoverWorker extends Thread {
private final UUID clusterId;

/**
* @param rsZnode
* @param rsZnode znode for dead region server
*/
public NodeFailoverWorker(String rsZnode) {
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
Expand Down Expand Up @@ -820,7 +824,9 @@ public FileSystem getFs() {
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
*/
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
public ReplicationPeers getReplicationPeers() {
return this.replicationPeers;
}

/**
* Get a string representation of all the sources' metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
String peerClusterId;
Path currentPath;
MetricsSource metrics;
public static final String fakeExceptionMessage = "Fake Exception";

@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy {

@Override
public void terminate(String reason) {
// This is to block the zk listener to close the queues
// to simulate the znodes getting deleted without zk listener getting invoked
throw new RuntimeException(fakeExceptionMessage);
}
}
Loading

0 comments on commit f9a9148

Please sign in to comment.