diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0cf6de4ec34b..d01f357d089a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -801,7 +801,7 @@ protected void initializeZKBasedSystemTrackers() this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); - this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId); + this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 0823ac0c16d2..29a8a1b18f8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -31,7 +31,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; @@ -46,11 +45,9 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; /** * Manages and performs all replication admin operations. @@ -66,17 +63,11 @@ public class ReplicationPeerManager { private final ConcurrentMap peers; - private final String clusterId; - - private final Configuration conf; - ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, - ConcurrentMap peers, Configuration conf, String clusterId) { + ConcurrentMap peers) { this.peerStorage = peerStorage; this.queueStorage = queueStorage; this.peers = peers; - this.conf = conf; - this.clusterId = clusterId; } private void checkQueuesDeleted(String peerId) @@ -254,10 +245,11 @@ void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); - ReplicationEndpoint endpoint = null; + boolean checkClusterKey = true; if (!StringUtils.isBlank(replicationEndpointImpl)) { + // try creating a instance + ReplicationEndpoint endpoint; try { - // try creating a instance endpoint = Class.forName(replicationEndpointImpl) .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); } catch (Throwable e) { @@ -265,15 +257,14 @@ private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetry "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, e); } + // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint + if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { + checkClusterKey = false; + } } - // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key - if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) { + if (checkClusterKey) { checkClusterKey(peerConfig.getClusterKey()); } - // Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster - if (endpoint == null || !endpoint.canReplicateToSameCluster()) { - checkClusterId(peerConfig.getClusterKey()); - } if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. @@ -366,25 +357,6 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { } } - private void checkClusterId(String clusterKey) throws DoNotRetryIOException { - String peerClusterId = ""; - try { - // Create the peer cluster config for get peer cluster id - Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); - try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { - peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); - } - } catch (IOException | KeeperException e) { - throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); - } - // In rare case, zookeeper setting may be messed up. That leads to the incorrect - // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId)) { - throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey - + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); - } - } - public List getSerialPeerIdsBelongsTo(TableName tableName) { return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) @@ -395,7 +367,7 @@ public ReplicationQueueStorage getQueueStorage() { return queueStorage; } - public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId) + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); @@ -406,7 +378,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); } return new ReplicationPeerManager(peerStorage, - ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c283b5c706ba..bf6ab7c7a433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -524,6 +524,16 @@ private void initialize() { if (!this.isSourceActive()) { return; } + + // In rare case, zookeeper setting may be messed up. That leads to the incorrect + // peerClusterId value, which is the same as the source clusterId + if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { + this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + + peerClusterId + " which is not allowed by ReplicationEndpoint:" + + replicationEndpoint.getClass().getName(), null, false); + this.manager.removeSource(this); + return; + } LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 966b11936553..b68ca01e7b63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); private final String ID_ONE = "1"; - private static String KEY_ONE; + private final String KEY_ONE = "127.0.0.1:2181:/hbase"; private final String ID_TWO = "2"; - private static String KEY_TWO; + private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -82,8 +82,6 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.startMiniCluster(); - KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; - KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 8c5c78cec437..04cf392a7106 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -113,11 +113,6 @@ protected void doStart() { protected void doStop() { notifyStopped(); } - - @Override - public boolean canReplicateToSameCluster() { - return true; - } } @BeforeClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 3be66e5bb474..37ca7dc830ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -25,6 +25,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.Random; @@ -33,14 +34,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -68,7 +72,9 @@ import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -170,16 +176,40 @@ public void testCyclicReplication1() throws Exception { /** * Tests the replication scenario 0 -> 0. By default - * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint}, - * the replication peer should not be added. + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the + * ReplicationSource should terminate, and no further logs should get enqueued */ - @Test(expected = DoNotRetryIOException.class) - public void testLoopedReplication() - throws Exception { + @Test + public void testLoopedReplication() throws Exception { LOG.info("testLoopedReplication"); startMiniClusters(1); createTableOnClusters(table); addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); + + // wait for source to terminate + final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); + Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ClusterMetrics clusterStatus = utilities[0].getAdmin() + .getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS)); + ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName); + List replicationLoadSourceList = + serverLoad.getReplicationLoadSourceList(); + return replicationLoadSourceList.isEmpty(); + } + }); + + Table[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + rollWALAndWait(utilities[0], table.getTableName(), row); + ZKWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, + ZNodePaths.joinZNode("replication", "rs")); + List listChildrenNoWatch = + ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString())); + assertEquals(0, listChildrenNoWatch.size()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 6fb24e070cb0..642139c177a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -500,11 +500,6 @@ protected void doStop() { stoppedCount.incrementAndGet(); notifyStopped(); } - - @Override - public boolean canReplicateToSameCluster() { - return true; - } } public static class InterClusterReplicationEndpointForTest diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index 522fb20543b0..bd800a841f8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -127,11 +127,6 @@ protected void doStart() { protected void doStop() { notifyStopped(); } - - @Override - public boolean canReplicateToSameCluster() { - return true; - } } @BeforeClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java index 271609dd7bd3..375f2ed4bc9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -174,9 +174,8 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception { } public static void createPeer() throws IOException { - ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") - .setSerial(true).build(); + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); }