Skip to content

Commit

Permalink
HBASE-24982 Disassemble the method replicateWALEntry from AdminServic…
Browse files Browse the repository at this point in the history
…e to a new interface ReplicationServerService (#2360)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
ddupg authored and infraio committed Sep 24, 2020
1 parent 4645d88 commit cd30e9b
Show file tree
Hide file tree
Showing 16 changed files with 292 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;

/**
* The implementation of AsyncConnection.
Expand Down Expand Up @@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {

private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
new ConcurrentHashMap<>();

private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();

Expand Down Expand Up @@ -266,12 +269,25 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
throws IOException {
return ReplicationServerService.newStub(
rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
() -> createAdminServerStub(serverName));
}

ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
hostnameCanChange), () -> createReplicationServerStub(serverName));
}

CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "ReplicationServerProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "server/region/Admin.proto";

service ReplicationServerService {
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -212,4 +215,20 @@ public static int getAdaptiveTimeout(final int initialValue, final int retries)
}
return initialValue * HConstants.RETRY_BACKOFF[ntries];
}

/**
* Check whether peer cluster supports replication offload.
* @param peerConn connection for peer cluster
* @return true if peer cluster version >= 3
* @throws IOException exception
*/
public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
throws IOException {
AsyncAdmin admin = peerConn.getAdmin();
String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
*/
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);

/**
* Get the admin service for the give replication server.
*/
AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);

/**
* Get the nonce generator for this connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}

@Override
public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
return new AsyncReplicationServerAdmin(serverName, this);
}

@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;

/**
* A simple wrapper of the {@link ReplicationServerService} for a replication server.
* <p/>
* Notice that there is no retry, and this is intentional.
*/
@InterfaceAudience.Private
public class AsyncReplicationServerAdmin {

private final ServerName server;

private final AsyncConnectionImpl conn;

AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
this.server = server;
this.conn = conn;
}

@FunctionalInterface
private interface RpcCall<RESP> {
void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
RpcCallback<RESP> done);
}

private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
try {
rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}

public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
return call((stub, controller, done) -> {
controller.setCallTimeout(timeout);
stub.replicateWALEntry(controller, request, done);
}, cellScanner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.FutureUtils;
Expand Down Expand Up @@ -61,6 +62,23 @@ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entri
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
}

/**
* A helper to replicate a list of WAL entries using replication server admin
* @param admin the replication server admin
* @param entries Array of WAL entries to be replicated
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/
public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
}

/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
* @param entries the WAL entries to be replicated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
Expand All @@ -270,7 +271,7 @@
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
ConfigurationObserver {
ConfigurationObserver, ReplicationServerService.BlockingInterface {
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);

/** RPC scheduler to use for the region server. */
Expand Down Expand Up @@ -1488,8 +1489,11 @@ protected List<BlockingServiceAndInterface> getServices() {
}
if (admin) {
bssi.add(new BlockingServiceAndInterface(
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(
ReplicationServerService.newReflectiveBlockingService(this),
ReplicationServerService.BlockingInterface.class));
}
return new org.apache.hbase.thirdparty.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import java.util.concurrent.ThreadLocalRandom;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
Expand Down Expand Up @@ -257,7 +261,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
return createSinkPeer(serverName);
}

/**
Expand Down Expand Up @@ -320,21 +324,60 @@ public synchronized void nodeChildrenChanged(String path) {
/**
* Wraps a replication region server sink to provide the ability to identify it.
*/
public static class SinkPeer {
public static abstract class SinkPeer {
private ServerName serverName;
private AsyncRegionServerAdmin regionServer;

public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
public SinkPeer(ServerName serverName) {
this.serverName = serverName;
this.regionServer = regionServer;
}

ServerName getServerName() {
return serverName;
}

public AsyncRegionServerAdmin getRegionServer() {
return regionServer;
public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
}

public static class RegionServerSinkPeer extends SinkPeer {

private AsyncRegionServerAdmin regionServer;

public RegionServerSinkPeer(ServerName serverName,
AsyncRegionServerAdmin replicationServer) {
super(serverName);
this.regionServer = replicationServer;
}

public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
}
}

public static class ReplicationServerSinkPeer extends SinkPeer {

private AsyncReplicationServerAdmin replicationServer;

public ReplicationServerSinkPeer(ServerName serverName,
AsyncReplicationServerAdmin replicationServer) {
super(serverName);
this.replicationServer = replicationServer;
}

public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
}
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
}
}
Loading

0 comments on commit cd30e9b

Please sign in to comment.