Skip to content

Commit

Permalink
HBASE-24743 Reject to add a peer which replicate to itself earlier (#…
Browse files Browse the repository at this point in the history
…2071)

Signed-off-by: Wellington Chevreuil <[email protected]>
Signed-off-by: Bharath Vissapragada <[email protected]>
  • Loading branch information
infraio authored Jul 21, 2020
1 parent 7ebc617 commit 6cf013d
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ protected void initializeZKBasedSystemTrackers()
}
this.rsGroupInfoManager = RSGroupInfoManager.create(this);

this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);

this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
Expand All @@ -50,9 +51,11 @@
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
Expand Down Expand Up @@ -81,11 +84,17 @@ public class ReplicationPeerManager {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final String clusterId;

private final Configuration conf;

ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
this.peerStorage = peerStorage;
this.queueStorage = queueStorage;
this.peers = peers;
this.conf = conf;
this.clusterId = clusterId;
}

private void checkQueuesDeleted(String peerId)
Expand Down Expand Up @@ -337,26 +346,26 @@ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationExcepti

private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
boolean checkClusterKey = true;
ReplicationEndpoint endpoint = null;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
// try creating a instance
ReplicationEndpoint endpoint;
try {
// try creating a instance
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
e);
}
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
checkClusterKey = false;
}
}
if (checkClusterKey) {
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
checkClusterKey(peerConfig.getClusterKey());
}
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
checkClusterId(peerConfig.getClusterKey());
}

if (peerConfig.replicateAllUserTables()) {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
Expand Down Expand Up @@ -501,6 +510,25 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
}
}

private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
String peerClusterId = "";
try {
// Create the peer cluster config for get peer cluster id
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
}
} catch (IOException | KeeperException e) {
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId)) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
}
}

public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
Expand All @@ -511,7 +539,7 @@ public ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}

public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
Expand All @@ -523,7 +551,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,6 @@ private void initialize() {
if(!this.isSourceActive()) {
return;
}

// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);

private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
private static String KEY_ONE;
private final String ID_TWO = "2";
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
private static String KEY_TWO;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
Expand All @@ -82,6 +82,8 @@ public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster();
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testAddPeerWithSameTable() throws Exception {
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
String peerId = "id" + i;
String clusterKey = "127.0.0.1:2181:/hbase" + i;
String clusterKey = TEST_UTIL.getClusterKey() + "-test" + i;
int index = i;
threads[i] = new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ protected void doStart() {
protected void doStop() {
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
Expand All @@ -34,17 +33,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -72,9 +68,7 @@
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -176,40 +170,16 @@ public void testCyclicReplication1() throws Exception {

/**
* Tests the replication scenario 0 -> 0. By default
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
* ReplicationSource should terminate, and no further logs should get enqueued
* {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
* the replication peer should not be added.
*/
@Test
public void testLoopedReplication() throws Exception {
@Test(expected = DoNotRetryIOException.class)
public void testLoopedReplication()
throws Exception {
LOG.info("testLoopedReplication");
startMiniClusters(1);
createTableOnClusters(table);
addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);

// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterMetrics clusterStatus = utilities[0].getAdmin()
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.isEmpty();
}
});

Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
ZNodePaths.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ protected void doStop() {
stoppedCount.incrementAndGet();
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

public static class InterClusterReplicationEndpointForTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ protected void doStart() {
protected void doStop() {
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception {
}

public static void createPeer() throws IOException {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
}
Expand Down

0 comments on commit 6cf013d

Please sign in to comment.