Skip to content

Commit

Permalink
HBASE-27806 Support dynamic reinitializing replication peer storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 21, 2023
1 parent 269586c commit 2cf1d5b
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver {

private final Configuration conf;
private volatile Configuration conf;

private final String id;

Expand Down Expand Up @@ -151,4 +152,9 @@ public long getPeerBandwidth() {
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener);
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,38 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to.
* <p>
* We implement {@link ConfigurationObserver} mainly for recreating the
* {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
* restarting the region server.
*/
@InterfaceAudience.Private
public class ReplicationPeers {
public class ReplicationPeers implements ConfigurationObserver {

private final Configuration conf;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);

private volatile Configuration conf;

// Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
private final FileSystem fs;
private final ZKWatcher zookeeper;
private volatile ReplicationPeerStorage peerStorage;

ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.fs = fs;
this.zookeeper = zookeeper;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
}
Expand Down Expand Up @@ -145,4 +158,18 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
for (ReplicationPeerImpl peer : peerCache.values()) {
try {
peer.onConfigurationChange(
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
} catch (ReplicationException e) {
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public ConfigurationManager getConfigurationManager() {
return configurationManager;
}

/**
* Reload the configuration from disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
Expand Down Expand Up @@ -791,6 +790,7 @@ private void initializeZKBasedSystemTrackers()

this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
this.configurationManager.registerObserver(replicationPeerManager);
this.replicationPeerModificationStateStore =
new ReplicationPeerModificationStateStore(masterRegion);

Expand Down Expand Up @@ -4293,12 +4293,6 @@ static void setDisableBalancerChoreForTest(boolean disable) {
disableBalancerChoreForTest = disable;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public ConfigurationManager getConfigurationManager() {
return configurationManager;
}

private void setQuotasObserver(Configuration conf) {
// Add the Observer to delete quotas on table deletion before starting all CPs by
// default with quota support, avoiding if user specifically asks to not load this Observer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
Expand Down Expand Up @@ -69,13 +70,16 @@
* Manages and performs all replication admin operations.
* <p>
* Used to add/remove a replication peer.
* <p>
* Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
* supporting migrating across different replication peer storages without restarting master.
*/
@InterfaceAudience.Private
public class ReplicationPeerManager {
public class ReplicationPeerManager implements ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);

private final ReplicationPeerStorage peerStorage;
private volatile ReplicationPeerStorage peerStorage;

private final ReplicationQueueStorage queueStorage;

Expand All @@ -94,10 +98,18 @@ public class ReplicationPeerManager {

private final String clusterId;

private final Configuration conf;
private volatile Configuration conf;

// for dynamic recreating ReplicationPeerStorage.
private final FileSystem fs;

private final ZKWatcher zk;

ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
Configuration conf, String clusterId) {
this.fs = fs;
this.zk = zk;
this.peerStorage = peerStorage;
this.queueStorage = queueStorage;
this.peers = peers;
Expand Down Expand Up @@ -582,7 +594,7 @@ public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configu
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
return new ReplicationPeerManager(fs, zk, peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
}

Expand All @@ -604,4 +616,10 @@ public boolean tryAcquireSyncReplicationPeerLock() {
public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
Expand Down Expand Up @@ -2065,6 +2065,14 @@ private void initializeThreads() {
}

private void registerConfigurationObservers() {
// Register Replication if possible, as now we support recreating replication peer storage, for
// migrating across different replication peer storages online
if (replicationSourceHandler instanceof ConfigurationObserver) {
configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
}
if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
}
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices);
Expand Down Expand Up @@ -3315,11 +3323,6 @@ public Optional<MobFileCache> getMobFileCache() {
return Optional.ofNullable(this.mobFileCache);
}

/** Returns : Returns the ConfigurationManager object for testing purposes. */
ConfigurationManager getConfigurationManager() {
return configurationManager;
}

CacheEvictionStats clearRegionBlockCache(Region region) {
long evictedBlocks = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
Expand All @@ -50,15 +52,18 @@

/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* <p>
* Implement {@link PropagatingConfigurationObserver} mainly for registering
* {@link ReplicationPeers}, so we can recreating the replication peer storage.
*/
@InterfaceAudience.Private
public class Replication implements ReplicationSourceService {
public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
private Configuration conf;
private volatile Configuration conf;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
Expand Down Expand Up @@ -229,4 +234,19 @@ public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
public ReplicationPeers getReplicationPeers() {
return replicationPeers;
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
}

@Override
public void registerChildren(ConfigurationManager manager) {
manager.registerObserver(replicationPeers);
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
manager.deregisterObserver(replicationPeers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestMigrateRepliationPeerStorageOnline {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

@BeforeClass
public static void setUp() throws Exception {
// use zookeeper first, and then migrate to filesystem
UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.ZOOKEEPER.name());
UTIL.startMiniCluster(1);
}

@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}

@Test
public void testMigrate() throws Exception {
Admin admin = UTIL.getAdmin();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
admin.addReplicationPeer("1", rpc);

// disable peer modification
admin.replicationPeerModificationSwitch(false, true);

// migrate replication peer data
Configuration conf = new Configuration(UTIL.getConfiguration());
assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
new String[] { "zookeeper", "filesystem" }));
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
// confirm that we have copied the data
ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
.getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf);
assertNotNull(fsPeerStorage.getPeerConfig("1"));

for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
Configuration newConf = new Configuration(mt.getMaster().getConfiguration());
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
}
for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration());
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
}

admin.replicationPeerModificationSwitch(true);
admin.removeReplicationPeer("1");

// confirm that we will operation on the new peer storage
assertThat(fsPeerStorage.listPeerIds(), empty());
}
}

0 comments on commit 2cf1d5b

Please sign in to comment.