Skip to content

Commit

Permalink
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (
Browse files Browse the repository at this point in the history
#2077)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
ddupg authored and infraio committed Oct 22, 2020
1 parent 4f77158 commit 1a64f1a
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1146,10 +1153,13 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,4 +1782,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}

/**
* Called before getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3856,4 +3856,9 @@ public CompactionState getCompactionState(final TableName tableName) {
}
return compactionState;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return this.serverManager.getOnlineServersList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2038,4 +2038,22 @@ public void call(MasterObserver observer) throws IOException {
}
});
}

public void preListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListReplicationSinkServers(this);
}
});
}

public void postListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListReplicationSinkServers(this);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -3381,4 +3383,23 @@ private MasterProtos.BalancerDecisionsResponse getBalancerDecisions(
.addAllBalancerDecision(balancerDecisions).build();
}

public ListReplicationSinkServersResponse listReplicationSinkServers(
RpcController controller, ListReplicationSinkServersRequest request)
throws ServiceException {
ListReplicationSinkServersResponse.Builder builder =
ListReplicationSinkServersResponse.newBuilder();
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
}
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,10 @@ default SplitWALManager getSplitWALManager(){
*/
boolean normalizeRegions(
final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' address
*/
List<ServerName> listReplicationSinkServers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,21 +31,26 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
Expand All @@ -52,6 +60,12 @@

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;

/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
Expand All @@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);

public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
"hbase.replication.fetch.servers.usezk";

public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
"hbase.replication.fetch.servers.interval";
public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins

private ZKWatcher zkw = null;
private final Object zkwLock = new Object();

Expand Down Expand Up @@ -94,6 +115,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private List<ServerName> sinkServers = new ArrayList<>(0);

private AsyncClusterConnection peerConnection;
private boolean fetchServersUseZk = false;
private FetchServersChore fetchServersChore;
private int shortOperationTimeout;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
Expand Down Expand Up @@ -129,6 +155,19 @@ protected void disconnect() {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
if (fetchServersChore != null) {
ChoreService choreService = ctx.getServer().getChoreService();
if (null != choreService) {
choreService.cancelChore(fetchServersChore);
}
}
if (peerConnection != null) {
try {
peerConnection.close();
} catch (IOException e) {
LOG.warn("Attempt to close peerConnection failed.", e);
}
}
}

/**
Expand Down Expand Up @@ -159,8 +198,27 @@ public void stop() {
}

@Override
protected void doStart() {
protected synchronized void doStart() {
this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
try {
if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
fetchServersUseZk = true;
} else {
try {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
fetchServersChore = new FetchServersChore(ctx.getServer(), this);
ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
fetchServersUseZk = false;
} else {
fetchServersUseZk = true;
}
} catch (Throwable t) {
fetchServersUseZk = true;
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
ctx.getPeerId(), t);
}
}
reloadZkWatcher();
connectPeerCluster();
notifyStarted();
Expand Down Expand Up @@ -203,7 +261,9 @@ private void reloadZkWatcher() throws IOException {
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
if (fetchServersUseZk) {
zkw.registerListener(new PeerRegionServerListener(this));
}
}
}

Expand All @@ -228,12 +288,47 @@ public boolean isAborted() {
return false;
}

/**
* Get the connection to peer cluster
* @return connection to peer cluster
* @throws IOException If anything goes wrong connecting
*/
private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
if (peerConnection == null) {
Configuration conf = ctx.getConfiguration();
peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
UserProvider.instantiate(conf).getCurrent());
}
return peerConnection;
}

/**
* Get the list of all the servers that are responsible for replication sink
* from the specified peer master
* @return list of server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
try {
AsyncClusterConnection peerConn = getPeerConnection();
ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
peerConn.getRpcClient()
.createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
ListReplicationSinkServersResponse resp = masterStub
.listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
return ProtobufUtil.toServerNameList(resp.getServerNameList());
} catch (ServiceException | IOException e) {
LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
}
return Collections.emptyList();
}

/**
* Get the list of all the region servers from the specified peer
*
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
protected List<ServerName> fetchSlavesAddressesByZK() {
List<String> children = null;
try {
synchronized (zkwLock) {
Expand All @@ -256,7 +351,12 @@ protected List<ServerName> fetchSlavesAddresses() {
}

protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
List<ServerName> slaveAddresses = Collections.emptyList();
if (fetchServersUseZk) {
slaveAddresses = fetchSlavesAddressesByZK();
} else {
slaveAddresses = fetchSlavesAddresses();
}
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Expand Down Expand Up @@ -287,6 +387,14 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
return createSinkPeer(serverName);
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
}

/**
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
Expand Down Expand Up @@ -396,11 +504,23 @@ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
}
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
/**
* Chore that will fetch the list of servers from peer master.
*/
public static class FetchServersChore extends ScheduledChore {

private HBaseReplicationEndpoint endpoint;

public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) {
super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
server.getConfiguration()
.getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, DEFAULT_FETCH_SERVERS_INTERVAL));
this.endpoint = endpoint;
}

@Override
protected void chore() {
endpoint.chooseSinks();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
(t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
return worker;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,9 @@ public boolean isBalancerOn() {
public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
return false;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return null;
}
}
Loading

0 comments on commit 1a64f1a

Please sign in to comment.