Skip to content

Commit

Permalink
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o…
Browse files Browse the repository at this point in the history
…f ZooKeeper
  • Loading branch information
ddupg committed Jul 16, 2020
1 parent 9e8c930 commit 000e874
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2448,4 +2448,11 @@ Pair<List<String>, List<TableName>> getConfiguredNamespacesAndTablesInRSGroup(St
*/
void updateRSGroupConfig(String groupName, Map<String, String> configuration) throws IOException;

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' addresses
* @throws IOException if a remote or network exception occurs
*/
List<ServerName> listReplicationSinkServers() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1052,4 +1052,9 @@ public void updateRSGroupConfig(String groupName, Map<String, String> configurat
throws IOException {
get(admin.updateRSGroupConfig(groupName, configuration));
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return get(admin.listReplicationSinkServers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1655,4 +1655,10 @@ CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(final Set<ServerNam
* @throws IOException if a remote or network exception occurs
*/
CompletableFuture<Void> updateRSGroupConfig(String groupName, Map<String, String> configuration);

/**
* Get a list of servers' addresses for replication sink
* @return a list of servers' addresses
*/
CompletableFuture<List<ServerName>> listReplicationSinkServers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -929,4 +929,9 @@ public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
updateRSGroupConfig(String groupName, Map<String, String> configuration) {
return wrap(rawAdmin.updateRSGroupConfig(groupName, configuration));
}

@Override
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
return wrap(rawAdmin.listReplicationSinkServers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -4200,4 +4202,16 @@ this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
(s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
).call();
}

@Override
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
return this.<List<ServerName>> newMasterCaller()
.action(((controller, stub) ->
this.<ListReplicationSinkServersRequest, ListReplicationSinkServersResponse,
List<ServerName>>call(
controller, stub, ListReplicationSinkServersRequest.newBuilder().build(),
(s, c, req, done) -> s.listReplicationSinkServers(c, req, done),
resp -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,13 @@ message SwitchExceedThrottleQuotaResponse {
required bool previous_exceed_throttle_quota_enabled = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1119,7 +1126,10 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,4 +1784,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}

/**
* Called before getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3941,4 +3941,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return this.serverManager.getOnlineServersList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2039,4 +2039,22 @@ public void call(MasterObserver observer) throws IOException {
}
});
}

public void preListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListReplicationSinkServers(this);
}
});
}

public void postListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListReplicationSinkServers(this);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -3274,4 +3276,25 @@ public UpdateRSGroupConfigResponse updateRSGroupConfig(RpcController controller,
}
return builder.build();
}

@Override
public ListReplicationSinkServersResponse listReplicationSinkServers(
RpcController controller, ListReplicationSinkServersRequest request)
throws ServiceException {
ListReplicationSinkServersResponse.Builder builder =
ListReplicationSinkServersResponse.newBuilder();
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
}
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,10 @@ default SplitWALManager getSplitWALManager(){
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' address
*/
List<ServerName> listReplicationSinkServers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void init(Context context) throws IOException {
/**
* No-op implementation for subclasses to override if they wish to execute logic if their config changes
*/
public void peerConfigUpdated(ReplicationPeerConfig rpc){
public void peerConfigUpdated(ReplicationPeerConfig rpc) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
Expand All @@ -52,11 +54,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private List<ServerName> regionServers = new ArrayList<>(0);
private long lastRegionServerUpdate;
private Connection peerConnection;

protected synchronized void disconnect() {
if (zkw != null) {
zkw.close();
}
if (peerConnection != null) {
try {
peerConnection.close();
} catch (IOException e) {
LOG.warn("Attempt to close peerConnection failed.", e);
}
}
}

/**
Expand Down Expand Up @@ -130,7 +140,9 @@ protected synchronized ZKWatcher getZkw() {
* @throws IOException If anything goes wrong connecting
*/
synchronized void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
if (zkw != null) {
zkw.close();
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
getZkw().registerListener(new PeerRegionServerListener(this));
Expand All @@ -148,15 +160,34 @@ public boolean isAborted() {
return false;
}

/**
* Get the connection to peer cluster
* @return connection to peer cluster
* @throws IOException
*/
protected synchronized Connection getPeerConnection() throws IOException {
if (peerConnection == null) {
peerConnection = ConnectionFactory.createConnection(ctx.getConfiguration());
}
return peerConnection;
}

/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
zkw.getZNodePaths().rsZNode);
protected List<ServerName> fetchSlavesAddresses() throws KeeperException {
// fetch from master firstly
try (Admin admin = getPeerConnection().getAdmin()) {
return admin.listReplicationSinkServers();
} catch (IOException e) {
LOG.warn("Attempt to fetch servers from master of peer cluster failed."
+ " May be because the peer cluster doesn't support this way", e);
}
// fetch from zk secondly
ZKWatcher zk = getZkw();
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zk,
zk.getZNodePaths().rsZNode);
if (children == null) {
return Collections.emptyList();
}
Expand All @@ -168,7 +199,7 @@ protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
}

/**
* Get a list of all the addresses of all the region servers
* Get a list of all the addresses of all servers that are responsible for replication sink
* for this peer cluster
* @return list of addresses
*/
Expand All @@ -178,7 +209,7 @@ protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
// over time.
public synchronized List<ServerName> getRegionServers() {
try {
setRegionServers(fetchSlavesAddresses(this.getZkw()));
setRegionServers(fetchSlavesAddresses());
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
Expand Down Expand Up @@ -225,7 +256,8 @@ public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
replicationEndpoint.setRegionServers(
replicationEndpoint.fetchSlavesAddresses());
} catch (KeeperException e) {
LOG.error("Error reading slave addresses", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,9 @@ public RSGroupInfoManager getRSGroupInfoManager() {
public boolean isBalancerOn() {
return false;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,11 @@ public void updateRSGroupConfig(String groupName, Map<String, String> configurat
verify();
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return admin.listReplicationSinkServers();
}

private void verify() throws IOException {
Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
Set<RSGroupInfo> zList = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,4 +1272,9 @@ public void updateRSGroupConfig(String groupName, Map<String, String> configurat
throws IOException {
throw new NotImplementedException("updateRSGroupConfig not supported in ThriftAdmin");
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
throw new NotImplementedException("listReplicationSinkServers not supported in ThriftAdmin");
}
}

0 comments on commit 000e874

Please sign in to comment.