Skip to content

Commit

Permalink
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (
Browse files Browse the repository at this point in the history
#2077)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
ddupg authored and Guanghao Zhang committed Sep 26, 2020
1 parent 0dc4d5d commit 662f5e1
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1146,10 +1153,13 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,4 +1782,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}

/**
* Called before getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3900,4 +3900,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return this.serverManager.getOnlineServersList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2038,4 +2038,22 @@ public void call(MasterObserver observer) throws IOException {
}
});
}

public void preListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListReplicationSinkServers(this);
}
});
}

public void postListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListReplicationSinkServers(this);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -3375,4 +3377,23 @@ private MasterProtos.BalancerDecisionsResponse getBalancerDecisions(
.addAllBalancerDecision(balancerDecisions).build();
}

public ListReplicationSinkServersResponse listReplicationSinkServers(
RpcController controller, ListReplicationSinkServersRequest request)
throws ServiceException {
ListReplicationSinkServersResponse.Builder builder =
ListReplicationSinkServersResponse.newBuilder();
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
}
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,10 @@ default SplitWALManager getSplitWALManager(){
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' address
*/
List<ServerName> listReplicationSinkServers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,21 +31,26 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
Expand All @@ -52,6 +60,12 @@

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;

/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
Expand All @@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);

public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
"hbase.replication.fetch.servers.usezk";

public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
"hbase.replication.fetch.servers.interval";
public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins

private ZKWatcher zkw = null;

protected Configuration conf;
Expand Down Expand Up @@ -93,6 +114,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private List<ServerName> sinkServers = new ArrayList<>(0);

private AsyncClusterConnection peerConnection;
private boolean fetchServersUseZk = false;
private FetchServersChore fetchServersChore;
private int shortOperationTimeout;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
Expand Down Expand Up @@ -122,6 +148,19 @@ protected synchronized void disconnect() {
if (zkw != null) {
zkw.close();
}
if (fetchServersChore != null) {
ChoreService choreService = ctx.getServer().getChoreService();
if (null != choreService) {
choreService.cancelChore(fetchServersChore);
}
}
if (peerConnection != null) {
try {
peerConnection.close();
} catch (IOException e) {
LOG.warn("Attempt to close peerConnection failed.", e);
}
}
}

/**
Expand Down Expand Up @@ -152,8 +191,27 @@ public void stop() {
}

@Override
protected void doStart() {
protected synchronized void doStart() {
this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
try {
if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
fetchServersUseZk = true;
} else {
try {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
fetchServersChore = new FetchServersChore(ctx.getServer(), this);
ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
fetchServersUseZk = false;
} else {
fetchServersUseZk = true;
}
} catch (Throwable t) {
fetchServersUseZk = true;
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
ctx.getPeerId(), t);
}
}
reloadZkWatcher();
notifyStarted();
} catch (IOException e) {
Expand Down Expand Up @@ -192,7 +250,9 @@ private synchronized void reloadZkWatcher() throws IOException {
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
if (fetchServersUseZk) {
zkw.registerListener(new PeerRegionServerListener(this));
}
}

@Override
Expand All @@ -207,12 +267,47 @@ public boolean isAborted() {
return false;
}

/**
* Get the connection to peer cluster
* @return connection to peer cluster
* @throws IOException If anything goes wrong connecting
*/
private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
if (peerConnection == null) {
Configuration conf = ctx.getConfiguration();
peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
UserProvider.instantiate(conf).getCurrent());
}
return peerConnection;
}

/**
* Get the list of all the servers that are responsible for replication sink
* from the specified peer master
* @return list of server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
try {
AsyncClusterConnection peerConn = getPeerConnection();
ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
peerConn.getRpcClient()
.createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
ListReplicationSinkServersResponse resp = masterStub
.listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
return ProtobufUtil.toServerNameList(resp.getServerNameList());
} catch (ServiceException | IOException e) {
LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
}
return Collections.emptyList();
}

/**
* Get the list of all the region servers from the specified peer
*
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
protected List<ServerName> fetchSlavesAddressesByZK() {
List<String> children = null;
try {
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
Expand All @@ -233,7 +328,12 @@ protected List<ServerName> fetchSlavesAddresses() {
}

protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
List<ServerName> slaveAddresses = Collections.emptyList();
if (fetchServersUseZk) {
slaveAddresses = fetchSlavesAddressesByZK();
} else {
slaveAddresses = fetchSlavesAddresses();
}
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Expand Down Expand Up @@ -264,6 +364,14 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
return createSinkPeer(serverName);
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
}

/**
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
Expand Down Expand Up @@ -373,11 +481,23 @@ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
}
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
/**
* Chore that will fetch the list of servers from peer master.
*/
public static class FetchServersChore extends ScheduledChore {

private HBaseReplicationEndpoint endpoint;

public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) {
super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
server.getConfiguration()
.getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, DEFAULT_FETCH_SERVERS_INTERVAL));
this.endpoint = endpoint;
}

@Override
protected void chore() {
endpoint.chooseSinks();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
(t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
return worker;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,6 @@ void clearWALEntryBatch() {

LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
source.controller.getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
}
}
Loading

0 comments on commit 662f5e1

Please sign in to comment.