diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 68b343ae6af2..cbcf6d7b909d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2448,4 +2448,11 @@ Pair, List> getConfiguredNamespacesAndTablesInRSGroup(St */ void updateRSGroupConfig(String groupName, Map configuration) throws IOException; + /** + * Get a list of servers' addresses for replication sink. + * @return a list of servers' addresses + * @throws IOException if a remote or network exception occurs + */ + List listReplicationSinkServers() throws IOException; + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 7533c091813c..99d7fb8bc3a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -1052,4 +1052,9 @@ public void updateRSGroupConfig(String groupName, Map configurat throws IOException { get(admin.updateRSGroupConfig(groupName, configuration)); } + + @Override + public List listReplicationSinkServers() throws IOException { + return get(admin.listReplicationSinkServers()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index f119c7ebea40..bc26cca5da69 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1655,4 +1655,10 @@ CompletableFuture> getSlowLogResponses(final Set updateRSGroupConfig(String groupName, Map configuration); + + /** + * Get a list of servers' addresses for replication sink + * @return a list of servers' addresses + */ + CompletableFuture> listReplicationSinkServers(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 1a49919c057a..00a60c6ff549 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -929,4 +929,9 @@ public CompletableFuture renameRSGroup(String oldName, String newName) { updateRSGroupConfig(String groupName, Map configuration) { return wrap(rawAdmin.updateRSGroupConfig(groupName, configuration)); } + + @Override + public CompletableFuture> listReplicationSinkServers() { + return wrap(rawAdmin.listReplicationSinkServers()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 63c177a7b6d6..39df141e0fd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -221,6 +221,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; @@ -4200,4 +4202,16 @@ this. call( (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) ).call(); } + + @Override + public CompletableFuture> listReplicationSinkServers() { + return this.> newMasterCaller() + .action(((controller, stub) -> + this.>call( + controller, stub, ListReplicationSinkServersRequest.newBuilder().build(), + (s, c, req, done) -> s.listReplicationSinkServers(c, req, done), + resp -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) + ).call(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 286c96f688fd..ced08529b692 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -690,6 +690,13 @@ message SwitchExceedThrottleQuotaResponse { required bool previous_exceed_throttle_quota_enabled = 1; } +message ListReplicationSinkServersRequest { +} + +message ListReplicationSinkServersResponse { + repeated ServerName server_name = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1119,7 +1126,10 @@ service MasterService { returns (RenameRSGroupResponse); rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest) - returns (UpdateRSGroupConfigResponse); + returns (UpdateRSGroupConfigResponse); + + rpc ListReplicationSinkServers(ListReplicationSinkServersRequest) + returns (ListReplicationSinkServersResponse); } // HBCK Service definitions. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index ffe4f53ac66f..0c5bb0d49411 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -1784,4 +1784,20 @@ default void preHasUserPermissions(ObserverContext default void postHasUserPermissions(ObserverContext ctx, String userName, List permissions) throws IOException { } + + /** + * Called before getting servers for replication sink. + * @param ctx the coprocessor instance's environment + */ + default void preListReplicationSinkServers(ObserverContext ctx) + throws IOException { + } + + /** + * Called after getting servers for replication sink. + * @param ctx the coprocessor instance's environment + */ + default void postListReplicationSinkServers(ObserverContext ctx) + throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cc3a44f02c03..d00c8be3fe44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3941,4 +3941,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() { public RSGroupInfoManager getRSGroupInfoManager() { return rsGroupInfoManager; } + + @Override + public List listReplicationSinkServers() throws IOException { + return this.serverManager.getOnlineServersList(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 28c9c3090911..3de4cac82f2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -2039,4 +2039,22 @@ public void call(MasterObserver observer) throws IOException { } }); } + + public void preListReplicationSinkServers() throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preListReplicationSinkServers(this); + } + }); + } + + public void postListReplicationSinkServers() throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postListReplicationSinkServers(this); + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fc29a381d47f..ce74e65cfa15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -248,6 +248,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; @@ -3274,4 +3276,25 @@ public UpdateRSGroupConfigResponse updateRSGroupConfig(RpcController controller, } return builder.build(); } + + @Override + public ListReplicationSinkServersResponse listReplicationSinkServers( + RpcController controller, ListReplicationSinkServersRequest request) + throws ServiceException { + ListReplicationSinkServersResponse.Builder builder = + ListReplicationSinkServersResponse.newBuilder(); + try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preListReplicationSinkServers(); + } + builder.addAllServerName(master.listReplicationSinkServers().stream() + .map(ProtobufUtil::toServerName).collect(Collectors.toList())); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postListReplicationSinkServers(); + } + } catch (IOException e) { + throw new ServiceException(e); + } + return builder.build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 02e9dce7154c..1dc800843a76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -552,4 +552,10 @@ default SplitWALManager getSplitWALManager(){ * @return The state of the load balancer, or false if the load balancer isn't defined. */ boolean isBalancerOn(); + + /** + * Get a list of servers' addresses for replication sink. + * @return a list of servers' address + */ + List listReplicationSinkServers() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 56576a6cf3e1..0747e9aa5c3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -61,7 +61,7 @@ public void init(Context context) throws IOException { /** * No-op implementation for subclasses to override if they wish to execute logic if their config changes */ - public void peerConfigUpdated(ReplicationPeerConfig rpc){ + public void peerConfigUpdated(ReplicationPeerConfig rpc) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 1ca70ad85dd3..a65a409425c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -23,14 +23,16 @@ import java.util.Collections; import java.util.List; import java.util.UUID; - -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.AuthFailedException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -52,11 +54,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private List regionServers = new ArrayList<>(0); private long lastRegionServerUpdate; + private Connection peerConnection; protected synchronized void disconnect() { if (zkw != null) { zkw.close(); } + if (peerConnection != null) { + try { + peerConnection.close(); + } catch (IOException e) { + LOG.warn("Attempt to close peerConnection failed.", e); + } + } } /** @@ -130,7 +140,9 @@ protected synchronized ZKWatcher getZkw() { * @throws IOException If anything goes wrong connecting */ synchronized void reloadZkWatcher() throws IOException { - if (zkw != null) zkw.close(); + if (zkw != null) { + zkw.close(); + } zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); getZkw().registerListener(new PeerRegionServerListener(this)); @@ -148,15 +160,34 @@ public boolean isAborted() { return false; } + /** + * Get the connection to peer cluster + * @return connection to peer cluster + * @throws IOException + */ + protected synchronized Connection getPeerConnection() throws IOException { + if (peerConnection == null) { + peerConnection = ConnectionFactory.createConnection(ctx.getConfiguration()); + } + return peerConnection; + } + /** * Get the list of all the region servers from the specified peer - * @param zkw zk connection to use * @return list of region server addresses or an empty list if the slave is unavailable */ - protected static List fetchSlavesAddresses(ZKWatcher zkw) - throws KeeperException { - List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, - zkw.getZNodePaths().rsZNode); + protected List fetchSlavesAddresses() throws KeeperException { + // fetch from master firstly + try (Admin admin = getPeerConnection().getAdmin()) { + return admin.listReplicationSinkServers(); + } catch (IOException e) { + LOG.warn("Attempt to fetch servers from master of peer cluster failed." + + " May be because the peer cluster doesn't support this way", e); + } + // fetch from zk secondly + ZKWatcher zk = getZkw(); + List children = ZKUtil.listChildrenAndWatchForNewChildren(zk, + zk.getZNodePaths().rsZNode); if (children == null) { return Collections.emptyList(); } @@ -168,7 +199,7 @@ protected static List fetchSlavesAddresses(ZKWatcher zkw) } /** - * Get a list of all the addresses of all the region servers + * Get a list of all the addresses of all servers that are responsible for replication sink * for this peer cluster * @return list of addresses */ @@ -178,7 +209,7 @@ protected static List fetchSlavesAddresses(ZKWatcher zkw) // over time. public synchronized List getRegionServers() { try { - setRegionServers(fetchSlavesAddresses(this.getZkw())); + setRegionServers(fetchSlavesAddresses()); } catch (KeeperException ke) { if (LOG.isDebugEnabled()) { LOG.debug("Fetch slaves addresses failed", ke); @@ -225,7 +256,8 @@ public synchronized void nodeChildrenChanged(String path) { if (path.equals(regionServerListNode)) { try { LOG.info("Detected change to peer region servers, fetching updated list"); - replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw())); + replicationEndpoint.setRegionServers( + replicationEndpoint.fetchSlavesAddresses()); } catch (KeeperException e) { LOG.error("Error reading slave addresses", e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 50851c15d5a1..5d4a76c4d634 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -506,4 +506,9 @@ public RSGroupInfoManager getRSGroupInfoManager() { public boolean isBalancerOn() { return false; } + + @Override + public List listReplicationSinkServers() throws IOException { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 6d0574a7b445..4155b3908e63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -833,6 +833,11 @@ public void updateRSGroupConfig(String groupName, Map configurat verify(); } + @Override + public List listReplicationSinkServers() throws IOException { + return admin.listReplicationSinkServers(); + } + private void verify() throws IOException { Map groupMap = Maps.newHashMap(); Set zList = Sets.newHashSet(); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index de0efc2e1eed..e33581f377f5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -1272,4 +1272,9 @@ public void updateRSGroupConfig(String groupName, Map configurat throws IOException { throw new NotImplementedException("updateRSGroupConfig not supported in ThriftAdmin"); } + + @Override + public List listReplicationSinkServers() throws IOException { + throw new NotImplementedException("listReplicationSinkServers not supported in ThriftAdmin"); + } }