diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 97b70e1a7ad8..f5b2f278826c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; /** * The implementation of AsyncConnection. @@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); private final ConcurrentMap adminSubs = new ConcurrentHashMap<>(); + private final ConcurrentMap replStubs = + new ConcurrentHashMap<>(); private final AtomicReference masterStub = new AtomicReference<>(); @@ -266,12 +269,25 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } + private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName) + throws IOException { + return ReplicationServerService.newStub( + rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } + AdminService.Interface getAdminStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange), () -> createAdminServerStub(serverName)); } + ReplicationServerService.Interface getReplicationServerStub(ServerName serverName) + throws IOException { + return ConcurrentMapUtils.computeIfAbsentEx(replStubs, + getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName, + hostnameCanChange), () -> createReplicationServerStub(serverName)); + } + CompletableFuture getMasterStub() { return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { CompletableFuture future = new CompletableFuture<>(); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto new file mode 100644 index 000000000000..ed334c476768 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "ReplicationServerProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "server/region/Admin.proto"; + +service ReplicationServerService { + rpc ReplicateWALEntry(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse); +} \ No newline at end of file diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index a786206a642e..7bafbc235290 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,4 +215,20 @@ public static int getAdaptiveTimeout(final int initialValue, final int retries) } return initialValue * HConstants.RETRY_BACKOFF[ntries]; } + + /** + * Check whether peer cluster supports replication offload. + * @param peerConn connection for peer cluster + * @return true if peer cluster version >= 3 + * @throws IOException exception + */ + public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn) + throws IOException { + AsyncAdmin admin = peerConn.getAdmin(); + String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion(); + if (Integer.parseInt(version.split("\\.")[0]) >= 3) { + return true; + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 92118ac444f3..b6a3b9708827 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection { */ AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName); + /** + * Get the admin service for the give replication server. + */ + AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName); + /** * Get the nonce generator for this connection. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 39fc3a28e0c7..e4c2ee3ac18b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -70,6 +70,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { return new AsyncRegionServerAdmin(serverName, this); } + @Override + public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) { + return new AsyncReplicationServerAdmin(serverName, this); + } + @Override public CompletableFuture flush(byte[] regionName, boolean writeFlushWALMarker) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java new file mode 100644 index 000000000000..7511a6423772 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; + +/** + * A simple wrapper of the {@link ReplicationServerService} for a replication server. + *

+ * Notice that there is no retry, and this is intentional. + */ +@InterfaceAudience.Private +public class AsyncReplicationServerAdmin { + + private final ServerName server; + + private final AsyncConnectionImpl conn; + + AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) { + this.server = server; + this.conn = conn; + } + + @FunctionalInterface + private interface RpcCall { + void call(ReplicationServerService.Interface stub, HBaseRpcController controller, + RpcCallback done); + } + + private CompletableFuture call(RpcCall rpcCall, CellScanner cellScanner) { + CompletableFuture future = new CompletableFuture<>(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner); + try { + rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + public CompletableFuture replicateWALEntry( + AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) { + return call((stub, controller, done) -> { + controller.setCallTimeout(timeout); + stub.replicateWALEntry(controller, request, done); + }, cellScanner); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java index e47c92914f0d..17f48a644cc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.FutureUtils; @@ -61,6 +62,23 @@ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entri FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); } + /** + * A helper to replicate a list of WAL entries using replication server admin + * @param admin the replication server admin + * @param entries Array of WAL entries to be replicated + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDir Path to source cluster base namespace directory + * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory + */ + public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, + int timeout) throws IOException { + Pair p = buildReplicateWALEntryRequest(entries, null, + replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); + } + /** * Create a new ReplicateWALEntryRequest from a list of WAL entries * @param entries the WAL entries to be replicated diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index be64966570f1..f509ee3f9dd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -257,6 +257,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -270,7 +271,7 @@ @SuppressWarnings("deprecation") public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, - ConfigurationObserver { + ConfigurationObserver, ReplicationServerService.BlockingInterface { protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ @@ -1488,8 +1489,11 @@ protected List getServices() { } if (admin) { bssi.add(new BlockingServiceAndInterface( - AdminService.newReflectiveBlockingService(this), - AdminService.BlockingInterface.class)); + AdminService.newReflectiveBlockingService(this), + AdminService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface( + ReplicationServerService.newReflectiveBlockingService(this), + ReplicationServerService.BlockingInterface.class)); } return new org.apache.hbase.thirdparty.com.google.common.collect. ImmutableList.Builder().addAll(bssi).build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 850a79125562..56564973fe60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -27,11 +27,15 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; @@ -257,7 +261,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException { } ServerName serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); - return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + return createSinkPeer(serverName); } /** @@ -320,21 +324,60 @@ public synchronized void nodeChildrenChanged(String path) { /** * Wraps a replication region server sink to provide the ability to identify it. */ - public static class SinkPeer { + public static abstract class SinkPeer { private ServerName serverName; - private AsyncRegionServerAdmin regionServer; - public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { + public SinkPeer(ServerName serverName) { this.serverName = serverName; - this.regionServer = regionServer; } ServerName getServerName() { return serverName; } - public AsyncRegionServerAdmin getRegionServer() { - return regionServer; + public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException; + } + + public static class RegionServerSinkPeer extends SinkPeer { + + private AsyncRegionServerAdmin regionServer; + + public RegionServerSinkPeer(ServerName serverName, + AsyncRegionServerAdmin replicationServer) { + super(serverName); + this.regionServer = replicationServer; + } + + public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException { + ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId, + sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout); + } + } + + public static class ReplicationServerSinkPeer extends SinkPeer { + + private AsyncReplicationServerAdmin replicationServer; + + public ReplicationServerSinkPeer(ServerName serverName, + AsyncReplicationServerAdmin replicationServer) { + super(serverName); + this.replicationServer = replicationServer; + } + + public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException { + ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId, + sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout); + } + } + + private SinkPeer createSinkPeer(ServerName serverName) throws IOException { + if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { + return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); + } else { + return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java index 1b9b69939f0f..15d4f8c1a789 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java @@ -27,14 +27,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; -import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.RpcServerInterface; @@ -58,53 +56,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -117,7 +73,7 @@ @InterfaceAudience.Private @SuppressWarnings("deprecation") public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, - AdminService.BlockingInterface, PriorityFunction { + ReplicationServerService.BlockingInterface, PriorityFunction { protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class); @@ -256,8 +212,8 @@ void stop() { protected List getServices() { List bssi = new ArrayList<>(); bssi.add(new BlockingServiceAndInterface( - AdminService.newReflectiveBlockingService(this), - AdminService.BlockingInterface.class)); + ReplicationServerService.newReflectiveBlockingService(this), + ReplicationServerService.BlockingInterface.class)); return new ImmutableList.Builder().addAll(bssi).build(); } @@ -325,154 +281,6 @@ protected void checkOpen() throws IOException { } } - @Override - public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetOnlineRegionResponse getOnlineRegion(RpcController controller, - GetOnlineRegionRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CompactionSwitchResponse compactionSwitch(RpcController controller, - CompactionSwitchRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CompactRegionResponse compactRegion(RpcController controller, - CompactRegionRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ReplicateWALEntryResponse replay(RpcController controller, - ReplicateWALEntryRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - /** - * Stop the replication server. - * - * @param controller the RPC controller - * @param request the request - */ - @Override - @QosPriority(priority=HConstants.ADMIN_QOS) - public StopServerResponse stopServer(final RpcController controller, - final StopServerRequest request) { - requestCount.increment(); - String reason = request.getReason(); - replicationServer.stop(reason); - return StopServerResponse.newBuilder().build(); - } - - @Override - public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, - UpdateFavoredNodesRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public UpdateConfigurationResponse updateConfiguration(RpcController controller, - UpdateConfigurationRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetRegionLoadResponse getRegionLoad(RpcController controller, - GetRegionLoadRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, - ClearCompactionQueuesRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, - ClearRegionBlockCacheRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, - GetSpaceQuotaSnapshotsRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ExecuteProceduresResponse executeProcedures(RpcController controller, - ExecuteProceduresRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public SlowLogResponses getSlowLogResponses(RpcController controller, - SlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public SlowLogResponses getLargeLogResponses(RpcController controller, - SlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller, - ClearSlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - protected AccessChecker getAccessChecker() { return accessChecker; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index b6e1f69173fe..68eb158feac9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -50,12 +50,10 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -600,11 +598,9 @@ protected int replicateEntries(List entries, int batchIndex, int timeout) logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); } sinkPeer = getReplicationSink(); - AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); try { - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, - entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, - hfileArchiveDir, timeout); + sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId, + baseNamespaceDir, hfileArchiveDir, timeout); if (LOG.isTraceEnabled()) { LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c72923716c6e..40db9a582144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -441,7 +441,7 @@ protected final void uncaughtException(Thread t, Throwable e, t.getName()); manager.refreshSources(peerId); break; - } catch (IOException e1) { + } catch (ReplicationException | IOException e1) { LOG.error("Replication sources refresh failed.", e1); sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java index 87557499f7ea..5af408622593 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java @@ -108,6 +108,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { return null; } + @Override + public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) { + return null; + } + @Override public NonceGenerator getNonceGenerator() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 41601417a9d4..4182eaff799c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -99,7 +100,7 @@ public void testReportBadSink() { // Sanity check assertEquals(1, endpoint.getNumSinks()); - SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeer = mockSinkPeer(serverNameA); endpoint.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, endpoint.getNumSinks()); @@ -123,7 +124,7 @@ public void testReportBadSinkPastThreshold() { assertEquals(expected, endpoint.getNumSinks()); ServerName badSinkServer0 = endpoint.getSinkServers().get(0); - SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeer = mockSinkPeer(badSinkServer0); for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeer); } @@ -133,7 +134,7 @@ public void testReportBadSinkPastThreshold() { // now try a sink that has some successes ServerName badSinkServer1 = endpoint.getSinkServers().get(0); - sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class)); + sinkPeer = mockSinkPeer(badSinkServer1); for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeer); } @@ -168,8 +169,8 @@ public void testReportBadSinkDownToZeroSinks() { ServerName serverNameA = endpoint.getSinkServers().get(0); ServerName serverNameB = endpoint.getSinkServers().get(1); - SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); - SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeerA = mockSinkPeer(serverNameA); + SinkPeer sinkPeerB = mockSinkPeer(serverNameB); for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeerA); @@ -207,4 +208,8 @@ public AsyncClusterConnection createConnection(Configuration conf) throws IOExce return null; } } + + private SinkPeer mockSinkPeer(ServerName serverName) { + return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java index 6a0ef3d15cbd..0ef23f2a4ccc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java @@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -116,22 +117,48 @@ public void after() throws IOException { TEST_UTIL.deleteTableIfAny(TABLENAME); } + /** + * Requests replication server using {@link AsyncReplicationServerAdmin} + */ @Test public void testReplicateWAL() throws Exception { - AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) - .getRegionServer().getAsyncClusterConnection(); - AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName()); + AsyncClusterConnection conn = + TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); + AsyncReplicationServerAdmin replAdmin = + conn.getReplicationServerAdmin(replicationServer.getServerName()); + + ReplicationServerSinkPeer sinkPeer = + new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin); + replicateWALEntryAndVerify(sinkPeer); + } + + /** + * Requests region server using {@link AsyncReplicationServerAdmin} + */ + @Test + public void testReplicateWAL2() throws Exception { + AsyncClusterConnection conn = + TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); + ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) + .getRegionServer().getServerName(); + AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs); + + ReplicationServerSinkPeer + sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); + replicateWALEntryAndVerify(sinkPeer); + } + private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception { Entry[] entries = new Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); } - ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId, - baseNamespaceDir, hfileArchiveDir, 1000); + sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir, + 1000); + Table table = TEST_UTIL.getConnection().getTable(TABLENAME); for (int i = 0; i < BATCH_SIZE; i++) { - Table table = TEST_UTIL.getConnection().getTable(TABLENAME); Result result = table.get(new Get(Bytes.toBytes(i))); Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY)); assertNotNull(cell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 41683f98775d..bd2e200f59d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -466,7 +466,7 @@ public void testAbortFalseOnError() throws IOException { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -503,7 +503,7 @@ public void testAbortTrueOnError() throws IOException { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup();