Skip to content

Commit

Permalink
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o…
Browse files Browse the repository at this point in the history
…f ZooKeeper
  • Loading branch information
ddupg committed Sep 15, 2020
1 parent 4ec226e commit 4d25e7c
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,13 @@ message SwitchExceedThrottleQuotaResponse {
required bool previous_exceed_throttle_quota_enabled = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1122,7 +1129,10 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,4 +1784,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}

/**
* Called before getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3907,4 +3907,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return this.serverManager.getOnlineServersList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2039,4 +2039,22 @@ public void call(MasterObserver observer) throws IOException {
}
});
}

public void preListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListReplicationSinkServers(this);
}
});
}

public void postListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListReplicationSinkServers(this);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -3321,4 +3323,25 @@ public UpdateRSGroupConfigResponse updateRSGroupConfig(RpcController controller,
}
return builder.build();
}

@Override
public ListReplicationSinkServersResponse listReplicationSinkServers(
RpcController controller, ListReplicationSinkServersRequest request)
throws ServiceException {
ListReplicationSinkServersResponse.Builder builder =
ListReplicationSinkServersResponse.newBuilder();
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
}
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,10 @@ default SplitWALManager getSplitWALManager(){
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' address
*/
List<ServerName> listReplicationSinkServers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,45 @@

package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;

/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
Expand All @@ -48,15 +67,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);

public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
"hbase.replication.fetch.servers.usezk";

public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
"hbase.replication.fetch.servers.interval";
public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins

private ZKWatcher zkw = null;

private List<ServerName> regionServers = new ArrayList<>(0);
private long lastRegionServerUpdate;
private AsyncClusterConnection peerConnection;
private boolean fetchServersUseZk = false;
private FetchServersChore fetchServersChore;
private int shortOperationTimeout;

protected synchronized void disconnect() {
if (zkw != null) {
zkw.close();
}
if (fetchServersChore != null) {
ChoreService choreService = ctx.getServer().getChoreService();
if (null != choreService) {
choreService.cancelChore(fetchServersChore);
}
}
if (peerConnection != null) {
try {
peerConnection.close();
} catch (IOException e) {
LOG.warn("Attempt to close peerConnection failed.", e);
}
}
}

/**
Expand Down Expand Up @@ -87,8 +130,27 @@ public void stop() {
}

@Override
protected void doStart() {
protected synchronized void doStart() {
this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
try {
if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
fetchServersUseZk = true;
} else {
try {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
fetchServersChore = new FetchServersChore(ctx.getServer(), this);
ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
fetchServersUseZk = false;
} else {
fetchServersUseZk = true;
}
} catch (Throwable t) {
fetchServersUseZk = true;
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
ctx.getPeerId(), t);
}
}
reloadZkWatcher();
notifyStarted();
} catch (IOException e) {
Expand Down Expand Up @@ -130,10 +192,14 @@ protected synchronized ZKWatcher getZkw() {
* @throws IOException If anything goes wrong connecting
*/
synchronized void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
if (zkw != null) {
zkw.close();
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
getZkw().registerListener(new PeerRegionServerListener(this));
if (fetchServersUseZk) {
getZkw().registerListener(new PeerRegionServerListener(this));
}
}

@Override
Expand All @@ -148,15 +214,48 @@ public boolean isAborted() {
return false;
}

/**
* Get the connection to peer cluster
* @return connection to peer cluster
* @throws IOException If anything goes wrong connecting
*/
protected synchronized AsyncClusterConnection getPeerConnection() throws IOException {
if (peerConnection == null) {
Configuration conf = ctx.getConfiguration();
peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
UserProvider.instantiate(conf).getCurrent());
}
return peerConnection;
}

/**
* Get the list of all the servers that are responsible for replication sink
* from the specified peer master
* @return list of server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() throws IOException {
AsyncClusterConnection peerConn = getPeerConnection();
ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
peerConn.getRpcClient()
.createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
try {
ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(null,
ListReplicationSinkServersRequest.newBuilder().build());
return ProtobufUtil.toServerNameList(resp.getServerNameList());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}

/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
zkw.getZNodePaths().rsZNode);
protected List<ServerName> fetchSlavesAddressesByZK() throws KeeperException {
ZKWatcher zk = getZkw();
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zk,
zk.getZNodePaths().rsZNode);
if (children == null) {
return Collections.emptyList();
}
Expand All @@ -168,22 +267,31 @@ protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
}

/**
* Get a list of all the addresses of all the available region servers
* for this peer cluster, or an empty list if no region servers available at peer cluster.
* Get a list of all the addresses of all the available servers that are responsible for
* replication sink for this peer cluster, or an empty list if no servers available at peer
* cluster.
* @return list of addresses
*/
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public synchronized List<ServerName> getRegionServers() {
try {
setRegionServers(fetchSlavesAddresses(this.getZkw()));
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
if (fetchServersUseZk) {
try {
setRegionServers(fetchSlavesAddressesByZK());
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
}
reconnect(ke);
}
} else {
try {
setRegionServers(fetchSlavesAddresses());
} catch (IOException e) {
LOG.warn("Fetch slaves addresses failed", e);
}
reconnect(ke);
}
return regionServers;
}
Expand Down Expand Up @@ -225,11 +333,35 @@ public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
replicationEndpoint.setRegionServers(replicationEndpoint.fetchSlavesAddressesByZK());
} catch (KeeperException e) {
LOG.error("Error reading slave addresses", e);
}
}
}
}

/**
* Chore that will fetch the list of servers from peer master.
*/
public static class FetchServersChore extends ScheduledChore {

private HBaseReplicationEndpoint endpoint;

public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) {
super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
server.getConfiguration().getInt(FETCH_SERVERS_INTERVAL_CONF_KEY,
DEFAULT_FETCH_SERVERS_INTERVAL));
this.endpoint = endpoint;
}

@Override
protected void chore() {
try {
endpoint.setRegionServers(endpoint.fetchSlavesAddresses());
} catch (Throwable t) {
LOG.error("Peer {} fetches servers failed", endpoint.ctx.getPeerId(), t);
}
}
}
}
Loading

0 comments on commit 4d25e7c

Please sign in to comment.