Skip to content

Commit

Permalink
HBASE-27448 Add an admin method to get replication enabled state
Browse files Browse the repository at this point in the history
  • Loading branch information
2005hithlj committed Oct 28, 2022
1 parent 6d4e7fd commit dfba105
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2555,4 +2555,12 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return <code>true</code> if replication peer is enabled
* @throws IOException if a remote or network exception occurs
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,9 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
public void flushMasterStore() throws IOException {
get(admin.flushMasterStore());
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return get(admin.isReplicationPeerEnabled(peerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1776,4 +1776,12 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
Expand Down Expand Up @@ -4284,4 +4286,16 @@ Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
.call();
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
request.setPeerId(peerId);
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
resp -> resp.getIsEnabled()))
.call();
}
}
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,13 @@ message ModifyColumnStoreFileTrackerResponse {
message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}

message GetReplicationPeerStateRequest {
required string peer_id = 1;
}
message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1203,6 +1210,9 @@ service MasterService {

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);

rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
Expand Down Expand Up @@ -3491,4 +3493,16 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
GetReplicationPeerStateRequest request) throws ServiceException {
boolean isEnabled;
try {
isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
} catch (ReplicationException ioe) {
throw new ServiceException(ioe);
}
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,15 @@ private void setPeerState(String peerId, boolean enabled) throws ReplicationExce
desc.getSyncReplicationState()));
}

public boolean getPeerState(String peerId) throws ReplicationException {
ReplicationPeerDescription desc = peers.get(peerId);
if (desc != null) {
return desc.isEnabled();
} else {
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
}
}

public void enablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestGetReplicationPeerState extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestGetReplicationPeerState.class);

@Test
public void testGetReplicationPeerState() throws Exception {

// Test disable replication peer
hbaseAdmin.disableReplicationPeer("2");
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));

// Test enable replication peer
hbaseAdmin.enableReplicationPeer("2");
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,9 @@ public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return admin.isReplicationPeerEnabled(peerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1314,4 +1314,9 @@ public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
throw new NotImplementedException("flushMasterStore not supported in ThriftAdmin");
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
}
}

0 comments on commit dfba105

Please sign in to comment.