Skip to content

Commit

Permalink
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o…
Browse files Browse the repository at this point in the history
…f ZooKeeper
  • Loading branch information
ddupg committed Jul 23, 2020
1 parent fae9f0c commit 073dca0
Show file tree
Hide file tree
Showing 20 changed files with 362 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2448,4 +2448,11 @@ Pair<List<String>, List<TableName>> getConfiguredNamespacesAndTablesInRSGroup(St
*/
void updateRSGroupConfig(String groupName, Map<String, String> configuration) throws IOException;

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' addresses
* @throws IOException if a remote or network exception occurs
*/
List<ServerName> listReplicationSinkServers() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1052,4 +1052,9 @@ public void updateRSGroupConfig(String groupName, Map<String, String> configurat
throws IOException {
get(admin.updateRSGroupConfig(groupName, configuration));
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return get(admin.listReplicationSinkServers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1655,4 +1655,10 @@ CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(final Set<ServerNam
* @throws IOException if a remote or network exception occurs
*/
CompletableFuture<Void> updateRSGroupConfig(String groupName, Map<String, String> configuration);

/**
* Get a list of servers' addresses for replication sink
* @return a list of servers' addresses
*/
CompletableFuture<List<ServerName>> listReplicationSinkServers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -929,4 +929,9 @@ public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
updateRSGroupConfig(String groupName, Map<String, String> configuration) {
return wrap(rawAdmin.updateRSGroupConfig(groupName, configuration));
}

@Override
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
return wrap(rawAdmin.listReplicationSinkServers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -4200,4 +4202,16 @@ this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
(s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
).call();
}

@Override
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
return this.<List<ServerName>> newMasterCaller()
.action(((controller, stub) ->
this.<ListReplicationSinkServersRequest, ListReplicationSinkServersResponse,
List<ServerName>>call(
controller, stub, ListReplicationSinkServersRequest.newBuilder().build(),
(s, c, req, done) -> s.listReplicationSinkServers(c, req, done),
resp -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,13 @@ message SwitchExceedThrottleQuotaResponse {
required bool previous_exceed_throttle_quota_enabled = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1119,7 +1126,10 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,4 +1784,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}

/**
* Called before getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after getting servers for replication sink.
* @param ctx the coprocessor instance's environment
*/
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3941,4 +3941,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;
}

@Override
public List<ServerName> listReplicationSinkServers() throws IOException {
return this.serverManager.getOnlineServersList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2039,4 +2039,22 @@ public void call(MasterObserver observer) throws IOException {
}
});
}

public void preListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preListReplicationSinkServers(this);
}
});
}

public void postListReplicationSinkServers() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postListReplicationSinkServers(this);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
Expand Down Expand Up @@ -3274,4 +3276,25 @@ public UpdateRSGroupConfigResponse updateRSGroupConfig(RpcController controller,
}
return builder.build();
}

@Override
public ListReplicationSinkServersResponse listReplicationSinkServers(
RpcController controller, ListReplicationSinkServersRequest request)
throws ServiceException {
ListReplicationSinkServersResponse.Builder builder =
ListReplicationSinkServersResponse.newBuilder();
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
}
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,10 @@ default SplitWALManager getSplitWALManager(){
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();

/**
* Get a list of servers' addresses for replication sink.
* @return a list of servers' address
*/
List<ServerName> listReplicationSinkServers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void init(Context context) throws IOException {
/**
* No-op implementation for subclasses to override if they wish to execute logic if their config changes
*/
public void peerConfigUpdated(ReplicationPeerConfig rpc){
public void peerConfigUpdated(ReplicationPeerConfig rpc) {

}

Expand Down
Loading

0 comments on commit 073dca0

Please sign in to comment.