diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index cda7742bcbf6..2332cea66366 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -23,13 +23,14 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class ReplicationPeerImpl implements ReplicationPeer { +public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver { - private final Configuration conf; + private volatile Configuration conf; private final String id; @@ -151,4 +152,9 @@ public long getPeerBandwidth() { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { this.peerConfigListeners.add(listener); } + + @Override + public void onConfigurationChange(Configuration conf) { + this.conf = conf; + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index a8f4a5efa54f..56b86a6f9d13 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -24,25 +24,38 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This provides an class for maintaining a set of peer clusters. These peers are remote slave * clusters that data is replicated to. + *
+ * We implement {@link ConfigurationObserver} mainly for recreating the
+ * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
+ * restarting the region server.
*/
@InterfaceAudience.Private
-public class ReplicationPeers {
+public class ReplicationPeers implements ConfigurationObserver {
- private final Configuration conf;
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
+
+ private volatile Configuration conf;
// Map of peer clusters keyed by their id
private final ConcurrentMap
* Used to add/remove a replication peer.
+ *
+ * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
+ * supporting migrating across different replication peer storages without restarting master.
*/
@InterfaceAudience.Private
-public class ReplicationPeerManager {
+public class ReplicationPeerManager implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
- private final ReplicationPeerStorage peerStorage;
+ private volatile ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
@@ -94,10 +98,18 @@ public class ReplicationPeerManager {
private final String clusterId;
- private final Configuration conf;
+ private volatile Configuration conf;
+
+ // for dynamic recreating ReplicationPeerStorage.
+ private final FileSystem fs;
+
+ private final ZKWatcher zk;
- ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
- ConcurrentMap
+ * Implement {@link PropagatingConfigurationObserver} mainly for registering
+ * {@link ReplicationPeers}, so we can recreating the replication peer storage.
*/
@InterfaceAudience.Private
-public class Replication implements ReplicationSourceService {
+public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
- private Configuration conf;
+ private volatile Configuration conf;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
@@ -229,4 +234,19 @@ public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
public ReplicationPeers getReplicationPeers() {
return replicationPeers;
}
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void registerChildren(ConfigurationManager manager) {
+ manager.registerObserver(replicationPeers);
+ }
+
+ @Override
+ public void deregisterChildren(ConfigurationManager manager) {
+ manager.deregisterObserver(replicationPeers);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java
new file mode 100644
index 000000000000..a824dde42a4a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestMigrateRepliationPeerStorageOnline {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // use zookeeper first, and then migrate to filesystem
+ UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.ZOOKEEPER.name());
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMigrate() throws Exception {
+ Admin admin = UTIL.getAdmin();
+ ReplicationPeerConfig rpc =
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
+ .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
+ admin.addReplicationPeer("1", rpc);
+
+ // disable peer modification
+ admin.replicationPeerModificationSwitch(false, true);
+
+ // migrate replication peer data
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
+ new String[] { "zookeeper", "filesystem" }));
+ conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.FILESYSTEM.name());
+ // confirm that we have copied the data
+ ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
+ .getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf);
+ assertNotNull(fsPeerStorage.getPeerConfig("1"));
+
+ for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ Configuration newConf = new Configuration(mt.getMaster().getConfiguration());
+ newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.FILESYSTEM.name());
+ mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
+ }
+ for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration());
+ newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.FILESYSTEM.name());
+ rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
+ }
+
+ admin.replicationPeerModificationSwitch(true);
+ admin.removeReplicationPeer("1");
+
+ // confirm that we will operation on the new peer storage
+ assertThat(fsPeerStorage.listPeerIds(), empty());
+ }
+}