Skip to content

Commit

Permalink
Revert "HBASE-24743 Reject to add a peer which replicate to itself ea…
Browse files Browse the repository at this point in the history
…rlier (#2071)"

This reverts commit 5db3ec2.

TestReplicationAdmin and TestReplicationShell are broken on branch-2 and master respectively
  • Loading branch information
virajjasani committed Jul 21, 2020
1 parent 3c91c33 commit 5bb76bf
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ protected void initializeZKBasedSystemTrackers()
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();

this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);

this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
Expand All @@ -46,11 +45,9 @@
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;

/**
* Manages and performs all replication admin operations.
Expand All @@ -66,17 +63,11 @@ public class ReplicationPeerManager {

private final ConcurrentMap<String, ReplicationPeerDescription> peers;

private final String clusterId;

private final Configuration conf;

ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
ConcurrentMap<String, ReplicationPeerDescription> peers) {
this.peerStorage = peerStorage;
this.queueStorage = queueStorage;
this.peers = peers;
this.conf = conf;
this.clusterId = clusterId;
}

private void checkQueuesDeleted(String peerId)
Expand Down Expand Up @@ -254,26 +245,26 @@ void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {

private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
ReplicationEndpoint endpoint = null;
boolean checkClusterKey = true;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
// try creating a instance
ReplicationEndpoint endpoint;
try {
// try creating a instance
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
e);
}
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
checkClusterKey = false;
}
}
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
if (checkClusterKey) {
checkClusterKey(peerConfig.getClusterKey());
}
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
checkClusterId(peerConfig.getClusterKey());
}

if (peerConfig.replicateAllUserTables()) {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
Expand Down Expand Up @@ -366,25 +357,6 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
}
}

private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
String peerClusterId = "";
try {
// Create the peer cluster config for get peer cluster id
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
}
} catch (IOException | KeeperException e) {
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId)) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
}
}

public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
Expand All @@ -395,7 +367,7 @@ public ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}

public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
Expand All @@ -406,7 +378,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,16 @@ private void initialize() {
if (!this.isSourceActive()) {
return;
}

// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);

private final String ID_ONE = "1";
private static String KEY_ONE;
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
private final String ID_TWO = "2";
private static String KEY_TWO;
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";

@BeforeClass
public static void setUpBeforeClass() throws Exception {
Expand All @@ -82,8 +82,6 @@ public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster();
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ protected void doStart() {
protected void doStop() {
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
Expand All @@ -33,14 +34,17 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -68,7 +72,9 @@
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -170,16 +176,40 @@ public void testCyclicReplication1() throws Exception {

/**
* Tests the replication scenario 0 -> 0. By default
* {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
* the replication peer should not be added.
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
* ReplicationSource should terminate, and no further logs should get enqueued
*/
@Test(expected = DoNotRetryIOException.class)
public void testLoopedReplication()
throws Exception {
@Test
public void testLoopedReplication() throws Exception {
LOG.info("testLoopedReplication");
startMiniClusters(1);
createTableOnClusters(table);
addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);

// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterMetrics clusterStatus = utilities[0].getAdmin()
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.isEmpty();
}
});

Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
ZNodePaths.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,6 @@ protected void doStop() {
stoppedCount.incrementAndGet();
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

public static class InterClusterReplicationEndpointForTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ protected void doStart() {
protected void doStop() {
notifyStopped();
}

@Override
public boolean canReplicateToSameCluster() {
return true;
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception {
}

public static void createPeer() throws IOException {
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setSerial(true).build();
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
}
Expand Down

0 comments on commit 5bb76bf

Please sign in to comment.