From 662f5e144d2591e27dadcdf46284b3266e92fc88 Mon Sep 17 00:00:00 2001 From: XinSun Date: Sun, 20 Sep 2020 10:54:43 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-24684=20Fetch=20ReplicationSink=20servers?= =?UTF-8?q?=20list=20from=20HMaster=20instead=20o=E2=80=A6=20(#2077)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil --- .../main/protobuf/server/master/Master.proto | 12 +- .../hbase/coprocessor/MasterObserver.java | 16 ++ .../apache/hadoop/hbase/master/HMaster.java | 5 + .../hbase/master/MasterCoprocessorHost.java | 18 +++ .../hbase/master/MasterRpcServices.java | 21 +++ .../hadoop/hbase/master/MasterServices.java | 6 + .../replication/HBaseReplicationEndpoint.java | 146 ++++++++++++++++-- .../regionserver/ReplicationSource.java | 4 +- .../ReplicationSourceShipper.java | 2 +- .../hbase/master/MockNoopMasterServices.java | 5 + .../TestHBaseReplicationEndpoint.java | 5 + .../TestReplicationFetchServers.java | 106 +++++++++++++ .../TestGlobalReplicationThrottler.java | 4 + ...ionReplicaReplicationEndpointNoMaster.java | 2 + .../regionserver/TestReplicationSource.java | 4 +- 15 files changed, 337 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 118ce77b2183..7dec566b87d8 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -717,6 +717,13 @@ message BalancerDecisionsResponse { repeated BalancerDecision balancer_decision = 1; } +message ListReplicationSinkServersRequest { +} + +message ListReplicationSinkServersResponse { + repeated ServerName server_name = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1146,10 +1153,13 @@ service MasterService { returns (RenameRSGroupResponse); rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest) - returns (UpdateRSGroupConfigResponse); + returns (UpdateRSGroupConfigResponse); rpc GetLogEntries(LogRequest) returns(LogEntry); + + rpc ListReplicationSinkServers(ListReplicationSinkServersRequest) + returns (ListReplicationSinkServersResponse); } // HBCK Service definitions. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index ac35caa8549b..ec009cc2de7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -1782,4 +1782,20 @@ default void preHasUserPermissions(ObserverContext default void postHasUserPermissions(ObserverContext ctx, String userName, List permissions) throws IOException { } + + /** + * Called before getting servers for replication sink. + * @param ctx the coprocessor instance's environment + */ + default void preListReplicationSinkServers(ObserverContext ctx) + throws IOException { + } + + /** + * Called after getting servers for replication sink. + * @param ctx the coprocessor instance's environment + */ + default void postListReplicationSinkServers(ObserverContext ctx) + throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e4bd3c5fce22..d1a8280703fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3900,4 +3900,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() { public RSGroupInfoManager getRSGroupInfoManager() { return rsGroupInfoManager; } + + @Override + public List listReplicationSinkServers() throws IOException { + return this.serverManager.getOnlineServersList(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 01d1a62d61b3..f775eba07e83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -2038,4 +2038,22 @@ public void call(MasterObserver observer) throws IOException { } }); } + + public void preListReplicationSinkServers() throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preListReplicationSinkServers(this); + } + }); + } + + public void postListReplicationSinkServers() throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postListReplicationSinkServers(this); + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 37fc58985e7b..314a2d6ba416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -263,6 +263,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; @@ -3375,4 +3377,23 @@ private MasterProtos.BalancerDecisionsResponse getBalancerDecisions( .addAllBalancerDecision(balancerDecisions).build(); } + public ListReplicationSinkServersResponse listReplicationSinkServers( + RpcController controller, ListReplicationSinkServersRequest request) + throws ServiceException { + ListReplicationSinkServersResponse.Builder builder = + ListReplicationSinkServersResponse.newBuilder(); + try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preListReplicationSinkServers(); + } + builder.addAllServerName(master.listReplicationSinkServers().stream() + .map(ProtobufUtil::toServerName).collect(Collectors.toList())); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postListReplicationSinkServers(); + } + } catch (IOException e) { + throw new ServiceException(e); + } + return builder.build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 908d21270c6e..7b212892e288 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -553,4 +553,10 @@ default SplitWALManager getSplitWALManager(){ * @return The state of the load balancer, or false if the load balancer isn't defined. */ boolean isBalancerOn(); + + /** + * Get a list of servers' addresses for replication sink. + * @return a list of servers' address + */ + List listReplicationSinkServers() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 56564973fe60..48719b693a47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -28,21 +31,26 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.AuthFailedException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -52,6 +60,12 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; /** * A {@link BaseReplicationEndpoint} for replication endpoints whose @@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); + public static final String FETCH_SERVERS_USE_ZK_CONF_KEY = + "hbase.replication.fetch.servers.usezk"; + + public static final String FETCH_SERVERS_INTERVAL_CONF_KEY = + "hbase.replication.fetch.servers.interval"; + public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins + private ZKWatcher zkw = null; protected Configuration conf; @@ -93,6 +114,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private List sinkServers = new ArrayList<>(0); + private AsyncClusterConnection peerConnection; + private boolean fetchServersUseZk = false; + private FetchServersChore fetchServersChore; + private int shortOperationTimeout; + /* * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different * Connection implementations, or initialize it in a different way, so defining createConnection @@ -122,6 +148,19 @@ protected synchronized void disconnect() { if (zkw != null) { zkw.close(); } + if (fetchServersChore != null) { + ChoreService choreService = ctx.getServer().getChoreService(); + if (null != choreService) { + choreService.cancelChore(fetchServersChore); + } + } + if (peerConnection != null) { + try { + peerConnection.close(); + } catch (IOException e) { + LOG.warn("Attempt to close peerConnection failed.", e); + } + } } /** @@ -152,8 +191,27 @@ public void stop() { } @Override - protected void doStart() { + protected synchronized void doStart() { + this.shortOperationTimeout = ctx.getLocalConfiguration().getInt( + HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); try { + if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) { + fetchServersUseZk = true; + } else { + try { + if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) { + fetchServersChore = new FetchServersChore(ctx.getServer(), this); + ctx.getServer().getChoreService().scheduleChore(fetchServersChore); + fetchServersUseZk = false; + } else { + fetchServersUseZk = true; + } + } catch (Throwable t) { + fetchServersUseZk = true; + LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", + ctx.getPeerId(), t); + } + } reloadZkWatcher(); notifyStarted(); } catch (IOException e) { @@ -192,7 +250,9 @@ private synchronized void reloadZkWatcher() throws IOException { } zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - zkw.registerListener(new PeerRegionServerListener(this)); + if (fetchServersUseZk) { + zkw.registerListener(new PeerRegionServerListener(this)); + } } @Override @@ -207,12 +267,47 @@ public boolean isAborted() { return false; } + /** + * Get the connection to peer cluster + * @return connection to peer cluster + * @throws IOException If anything goes wrong connecting + */ + private synchronized AsyncClusterConnection getPeerConnection() throws IOException { + if (peerConnection == null) { + Configuration conf = ctx.getConfiguration(); + peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, + UserProvider.instantiate(conf).getCurrent()); + } + return peerConnection; + } + + /** + * Get the list of all the servers that are responsible for replication sink + * from the specified peer master + * @return list of server addresses or an empty list if the slave is unavailable + */ + protected List fetchSlavesAddresses() { + try { + AsyncClusterConnection peerConn = getPeerConnection(); + ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster()); + MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( + peerConn.getRpcClient() + .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout)); + ListReplicationSinkServersResponse resp = masterStub + .listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build()); + return ProtobufUtil.toServerNameList(resp.getServerNameList()); + } catch (ServiceException | IOException e) { + LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e); + } + return Collections.emptyList(); + } + /** * Get the list of all the region servers from the specified peer * * @return list of region server addresses or an empty list if the slave is unavailable */ - protected List fetchSlavesAddresses() { + protected List fetchSlavesAddressesByZK() { List children = null; try { children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); @@ -233,7 +328,12 @@ protected List fetchSlavesAddresses() { } protected synchronized void chooseSinks() { - List slaveAddresses = fetchSlavesAddresses(); + List slaveAddresses = Collections.emptyList(); + if (fetchServersUseZk) { + slaveAddresses = fetchSlavesAddressesByZK(); + } else { + slaveAddresses = fetchSlavesAddresses(); + } if (slaveAddresses.isEmpty()) { LOG.warn("No sinks available at peer. Will not be able to replicate"); } @@ -264,6 +364,14 @@ protected synchronized SinkPeer getReplicationSink() throws IOException { return createSinkPeer(serverName); } + private SinkPeer createSinkPeer(ServerName serverName) throws IOException { + if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { + return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); + } else { + return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + } + } + /** * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it * failed). If a single SinkPeer is reported as bad more than @@ -373,11 +481,23 @@ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, } } - private SinkPeer createSinkPeer(ServerName serverName) throws IOException { - if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { - return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); - } else { - return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + /** + * Chore that will fetch the list of servers from peer master. + */ + public static class FetchServersChore extends ScheduledChore { + + private HBaseReplicationEndpoint endpoint; + + public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) { + super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server, + server.getConfiguration() + .getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, DEFAULT_FETCH_SERVERS_INTERVAL)); + this.endpoint = endpoint; + } + + @Override + protected void chore() { + endpoint.chooseSinks(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 879c60404155..2bf575d26dbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -343,9 +343,9 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue q Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + (t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); worker.setWALReader(walReader); - worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); return worker; } }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index eb558e03d958..a1983e792b3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -369,6 +369,6 @@ void clearWALEntryBatch() { LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", totalToDecrement.longValue()); - source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); + source.controller.getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 7c65005de55d..947ba5d7281a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -507,4 +507,9 @@ public RSGroupInfoManager getRSGroupInfoManager() { public boolean isBalancerOn() { return false; } + + @Override + public List listReplicationSinkServers() throws IOException { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 4182eaff799c..67657940e0cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -198,6 +198,11 @@ public List fetchSlavesAddresses() { return regionServers; } + @Override + public List fetchSlavesAddressesByZK() { + return regionServers; + } + @Override public boolean replicate(ReplicateContext replicateContext) { return false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java new file mode 100644 index 000000000000..9ceaceec6c19 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationFetchServers extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationFetchServers.class); + + private static AtomicBoolean fetchFlag = new AtomicBoolean(false); + + public static class MyObserver implements MasterCoprocessor, MasterObserver { + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override + public void postListReplicationSinkServers(ObserverContext ctx) { + fetchFlag.set(true); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName()); + TestReplicationBase.setUpBeforeClass(); + } + + @Before + public void beforeMethod() { + fetchFlag.set(false); + } + + @Test + public void testMasterListReplicationPeerServers() throws IOException, ServiceException { + AsyncClusterConnection conn = UTIL2.getAsyncConnection(); + ServerName master = UTIL2.getAdmin().getMaster(); + MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( + conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000)); + ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers( + null, ListReplicationSinkServersRequest.newBuilder().build()); + List servers = ProtobufUtil.toServerNameList(resp.getServerNameList()); + assertFalse(servers.isEmpty()); + assertTrue(fetchFlag.get()); + } + + @Test + public void testPutData() throws IOException { + htable1.put(new Put(row).addColumn(famName, famName, row)); + UTIL2.waitFor(30000L, () -> !htable2.get(new Get(row)).isEmpty()); + assertTrue(fetchFlag.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java index 1538fa360093..cfc9fa3652e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java @@ -118,6 +118,10 @@ public static void setUpBeforeClass() throws Exception { @AfterClass public static void tearDownAfterClass() throws Exception { + Admin admin1 = utility1.getAdmin(); + admin1.removeReplicationPeer("peer1"); + admin1.removeReplicationPeer("peer2"); + admin1.removeReplicationPeer("peer3"); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index ee1ae5f380d5..c676e3096105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -256,11 +256,13 @@ public void testRegionReplicaReplicationEndpointReplicate() throws Exception { ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); + when(context.getLocalConfiguration()).thenReturn(HTU.getConfiguration()); when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); when(context.getServer()).thenReturn(rs0); when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors()); replicator.init(context); replicator.startAsync(); + HTU.waitFor(30000, replicator::isRunning); //load some data to primary HTU.loadNumericRows(table, f, 0, 1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 5817c00d574e..1d9081ac0229 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -284,7 +284,7 @@ public void testTerminateClearsBuffer() throws Exception { ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, mockManager, null, mockPeer, null, + source.init(testConf, null, null, mockManager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source); @@ -310,7 +310,7 @@ public void testTerminateClearsBuffer() throws Exception { reader.addEntryToBatch(batch, mockEntry); reader.entryBatchQueue.put(batch); source.terminate("test"); - assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); + assertEquals(0, source.controller.getTotalBufferUsed().get()); } /**