* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
- *
+ *
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
+
+ /** Configuration to enable hedged reads on master registry **/
+ public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
+ "hbase.client.master_registry.enable_hedged_reads";
+
+ /** Default value for enabling hedging reads on master registry **/
+ public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
+
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+ private final boolean hedgedReadsEnabled;
+
// Configured list of masters to probe the meta information from.
- private final Set masterServers;
+ private final Set masterAddrs;
+
+ private final List masterStubs;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
- private final int rpcTimeoutMs;
-
- MasterRegistry(Configuration conf) throws UnknownHostException {
- boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
- MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
- Configuration finalConf;
- if (!hedgedReadsEnabled) {
- // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
- // the configuration so that other places reusing this reference is not affected.
- finalConf = new Configuration(conf);
- finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
- } else {
- finalConf = conf;
+
+ /**
+ * Parses the list of master addresses from the provided configuration. Supported format is comma
+ * separated host[:port] values. If no port number if specified, default master port is assumed.
+ * @param conf Configuration to parse from.
+ */
+ private static Set parseMasterAddrs(Configuration conf) throws UnknownHostException {
+ Set masterAddrs = new HashSet<>();
+ String configuredMasters = getMasterAddr(conf);
+ for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+ HostAndPort masterHostPort =
+ HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+ masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
- if (conf.get(MASTER_ADDRS_KEY) != null) {
- finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
+ Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
+ return masterAddrs;
+ }
+
+ MasterRegistry(Configuration conf) throws IOException {
+ this.hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
+ MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
+ int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+ conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
+ // this through the master registry...
+ // This is a problem as we will use the cluster id to determine the authentication method
+ rpcClient = RpcClientFactory.createClient(conf, null);
+ rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+ masterAddrs = parseMasterAddrs(conf);
+ ImmutableList.Builder builder =
+ ImmutableList.builderWithExpectedSize(masterAddrs.size());
+ User user = User.getCurrent();
+ for (ServerName masterAddr : masterAddrs) {
+ builder
+ .add(ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
- rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- masterServers = new HashSet<>();
- parseMasterAddrs(finalConf);
- rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
- rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
+ masterStubs = builder.build();
}
/**
* Builds the default master address end point if it is not specified in the configuration.
*/
- public static String getMasterAddr(Configuration conf) throws UnknownHostException {
+ public static String getMasterAddr(Configuration conf) throws UnknownHostException {
String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
return masterAddrFromConf;
@@ -117,64 +148,107 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback done);
}
- /**
- * Parses the list of master addresses from the provided configuration. Supported format is
- * comma separated host[:port] values. If no port number if specified, default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
- HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
+ private CompletableFuture call(ClientMetaService.Interface stub,
+ Callable callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
+ }
+
+ private void tryCompleteExceptionally(CompletableFuture> future, Throwable error,
+ AtomicInteger remaining, Collection errors) {
+ // a simple check to avoid doing expensive exception creation work.
+ if (future.isDone()) {
+ return;
+ }
+ if (remaining.decrementAndGet() == 0) {
+ // a simple check to avoid doing expensive exception creation work.
+ if (future.isDone()) {
+ return;
+ }
+ List exceptions = new ArrayList<>(errors);
+ exceptions.add(error);
+ RetriesExhaustedException ex =
+ new RetriesExhaustedException("masters", masterStubs.size(), exceptions);
+ future.completeExceptionally(new MasterRegistryFetchException(masterAddrs, ex));
+ } else {
+ errors.add(error);
}
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
}
- @VisibleForTesting
- public Set getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private void tryCompleteExceptionally(CompletableFuture future,
+ Throwable error, int index, Callable callable, Predicate isValidResp, String debug,
+ List errors) {
+ if (index == masterStubs.size() - 1) {
+ errors.add(error);
+ RetriesExhaustedException ex =
+ new RetriesExhaustedException("masters", masterStubs.size(), errors);
+ future.completeExceptionally(new MasterRegistryFetchException(masterAddrs, ex));
+ } else {
+ errors.add(error);
+ call(future, index + 1, callable, isValidResp, debug, errors);
+ }
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of exceptions.
- * @param RPC result type.
- * @param Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private RpcCallback getRpcCallBack(CompletableFuture future,
- Predicate isValidResp, Function transformResult, HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
+ }
+
+ private void call(CompletableFuture future, int index,
+ Callable callable, Predicate isValidResp, String debug, List errors) {
+ addListener(call(masterStubs.get(index), callable), (r, e) -> {
+ if (e != null) {
+ tryCompleteExceptionally(future, e, index, callable, isValidResp, debug, errors);
+ } else if (!isValidResp.test(r)) {
+ tryCompleteExceptionally(future, badResponse(debug), index, callable, isValidResp, debug,
+ errors);
+ } else {
+ future.complete(r);
}
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried", debug)));
- return;
+ });
+ }
+
+ private CompletableFuture call(Callable callable,
+ Predicate isValidResp, String debug) {
+ CompletableFuture future = new CompletableFuture<>();
+ if (hedgedReadsEnabled) {
+ // send all requests out concurrently
+ AtomicInteger remaining = new AtomicInteger(masterAddrs.size());
+ ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>();
+ for (ClientMetaService.Interface stub : masterStubs) {
+ addListener(call(stub, callable), (r, e) -> {
+ if (e != null) {
+ if (remaining.decrementAndGet() == 0) {
+ tryCompleteExceptionally(future, e, remaining, errors);
+ }
+ } else if (!isValidResp.test(r)) {
+ tryCompleteExceptionally(future, badResponse(debug), remaining, errors);
+ } else {
+ // must complete the future first, otherwise when several response come at almost the
+ // same time, the failure result may complete the future first, which is incorrect.
+ future.complete(r);
+ remaining.decrementAndGet();
+ }
+ });
}
- future.complete(transformResult.apply(rpcResult));
- };
+ } else {
+ // send requests one by one
+ List errors = new ArrayList<>();
+ call(future, 0, callable, isValidResp, debug, errors);
+ }
+ return future;
}
/**
@@ -182,40 +256,25 @@ private RpcCallback getRpcCallBack(CompletableFuture future,
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List regionLocations = new ArrayList<>();
- resp.getMetaLocationsList().forEach(
- location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+ resp.getMetaLocationsList()
+ .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture getMetaRegionLocations() {
- CompletableFuture result = new CompletableFuture<>();
- HBaseRpcController hrc = rpcControllerFactory.newController();
- RpcCallback callback = getRpcCallBack(result,
- (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
- "getMetaRegionLocations()");
- try {
- getMasterStub().getMetaRegionLocations(
- hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
- } catch (IOException e) {
- result.completeExceptionally(e);
- }
- return result;
+ return this. call((c, s, d) -> s.getMetaRegionLocations(c,
+ GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
+ "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
}
@Override
public CompletableFuture getClusterId() {
- CompletableFuture result = new CompletableFuture<>();
- HBaseRpcController hrc = rpcControllerFactory.newController();
- RpcCallback callback = getRpcCallBack(result,
- GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
- "getClusterId()");
- try {
- getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
- } catch (IOException e) {
- result.completeExceptionally(e);
- }
- return result;
+ return this
+ . call(
+ (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
+ GetClusterIdResponse::hasClusterId, "getClusterId()")
+ .thenApply(GetClusterIdResponse::getClusterId);
}
private ServerName transformServerName(GetActiveMasterResponse resp) {
@@ -224,17 +283,16 @@ private ServerName transformServerName(GetActiveMasterResponse resp) {
@Override
public CompletableFuture getActiveMaster() {
- CompletableFuture result = new CompletableFuture<>();
- HBaseRpcController hrc = rpcControllerFactory.newController();
- RpcCallback callback = getRpcCallBack(result,
- GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
- "getActiveMaster()");
- try {
- getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
- } catch (IOException e) {
- result.completeExceptionally(e);
- }
- return result;
+ return this
+ . call(
+ (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
+ GetActiveMasterResponse::hasServerName, "getActiveMaster()")
+ .thenApply(this::transformServerName);
+ }
+
+ @VisibleForTesting
+ Set getParsedMasterServers() {
+ return masterAddrs;
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index b0baff23d512..bf2f36157359 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,12 +20,12 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -47,6 +47,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@@ -60,6 +61,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
@@ -512,13 +514,6 @@ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}
- @Override
- public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout)
- throws UnknownHostException {
- // Check HedgedRpcChannel implementation for detailed comments.
- throw new UnsupportedOperationException("Hedging not supported for this implementation.");
- }
-
private static class AbstractRpcChannel {
protected final InetSocketAddress addr;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
deleted file mode 100644
index 7b681e079bdf..000000000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.PrettyPrinter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-
-/**
- * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
- * First received response is returned to the caller. This abstracts out the logic needed to batch
- * requests to multiple end points underneath and presents itself as a single logical RpcChannel to
- * the client.
- *
- * Hedging Details:
- * ---------------
- * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
- * end points to make the call to. We do multiple iterations until we get a proper response to the
- * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
- * configurable and is also known as 'fanOutSize'.
- *
- * - We randomize the addresses up front so that the batch order per client is non deterministic.
- * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
- * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
- * load on the client and server side when configuring the fan out.
- *
- * - In a happy case, once we receive a response from one end point, we cancel all the
- * other inflight rpcs in the same batch and return the response to the caller. If we do not get a
- * valid response from any address end point, we propagate the error back to the caller.
- *
- * - Rpc timeouts are applied to every hedged rpc.
- *
- * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
- * be hedged (for example: cluster state changing rpcs).
- *
- * (TODO) Retries and Adaptive hedging policy:
- * ------------------------------------------
- *
- * - No retries are handled at the channel level. Retries can be built in upper layers. However the
- * question is, do we even need retries? Hedging in fact is a substitute for retries.
- *
- * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
- * policy more adaptive. In most happy cases, the rpcs from the first few end points should return
- * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
- * is not needed. So, the idea is to make this request pattern pluggable so that the requests are
- * hedged only when needed.
- */
-class HedgedRpcChannel implements RpcChannel {
- private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
-
- /**
- * Currently hedging is only supported for non-blocking connection implementation types because
- * the channel implementation inherently relies on the connection implementation being async.
- * Refer to the comments in doCallMethod().
- */
- private final NettyRpcClient rpcClient;
- // List of service addresses to hedge the requests to.
- private final List addrs;
- private final User ticket;
- private final int rpcTimeout;
- // Controls the size of request fan out (number of rpcs per a single batch).
- private final int fanOutSize;
-
- /**
- * A simple rpc call back implementation to notify the batch context if any rpc is successful.
- */
- private static class BatchRpcCtxCallBack implements RpcCallback {
- private final BatchRpcCtx batchRpcCtx;
- private final HBaseRpcController rpcController;
- BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
- this.batchRpcCtx = batchRpcCtx;
- this.rpcController = rpcController;
- }
- @Override
- public void run(Message result) {
- batchRpcCtx.setResultIfNotSet(result, rpcController);
- }
- }
-
- /**
- * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
- * synchronize on multiple RPCs to different end points fetching the result. All the methods are
- * thread-safe.
- */
- private static class BatchRpcCtx {
- // Result set by the thread finishing first. Set only once.
- private final AtomicReference result = new AtomicReference<>();
- // Caller waits on this latch being set.
- // We set this to 1, so that the first successful RPC result is returned to the client.
- private CountDownLatch resultsReady = new CountDownLatch(1);
- // Failed rpc book-keeping.
- private AtomicInteger failedRpcCount = new AtomicInteger();
- // All the call handles for this batch.
- private final List callsInFlight = Collections.synchronizedList(new ArrayList<>());
-
- // Target addresses.
- private final List addresses;
- // Called when the result is ready.
- private final RpcCallback callBack;
- // Last failed rpc's exception. Used to propagate the reason to the controller.
- private IOException lastFailedRpcReason;
-
-
- BatchRpcCtx(List addresses, RpcCallback callBack) {
- this.addresses = addresses;
- this.callBack = Preconditions.checkNotNull(callBack);
- }
-
- /**
- * Sets the result only if it is not already set by another thread. Thread that successfully
- * sets the result also count downs the latch.
- * @param result Result to be set.
- */
- public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
- if (rpcController.failed()) {
- incrementFailedRpcs(rpcController.getFailed());
- return;
- }
- if (this.result.compareAndSet(null, result)) {
- resultsReady.countDown();
- // Cancel all pending in flight calls.
- for (Call call: callsInFlight) {
- // It is ok to do it for all calls as it is a no-op if the call is already done.
- final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
- "for the same rpc already succeeded. This is not needed anymore.", call);
- call.setException(new CallCancelledException(exceptionMsg));
- }
- }
- }
-
- /**
- * Waits until the results are populated and calls the callback if the call is successful.
- * @return true for successful rpc and false otherwise.
- */
- public boolean waitForResults() {
- try {
- // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
- // something on the remote is broken. Worst case we should wait for rpc time out to kick in.
- resultsReady.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
- }
- Message message = result.get();
- if (message != null) {
- callBack.run(message);
- return true;
- }
- return false;
- }
-
- public void addCallInFlight(Call c) {
- callsInFlight.add(c);
- }
-
- public void incrementFailedRpcs(IOException reason) {
- if (failedRpcCount.incrementAndGet() == addresses.size()) {
- lastFailedRpcReason = reason;
- // All the rpcs in this batch have failed. Invoke the waiting threads.
- resultsReady.countDown();
- }
- }
-
- public IOException getLastFailedRpcReason() {
- return lastFailedRpcReason;
- }
-
- @Override
- public String toString() {
- return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
- }
- }
-
- public HedgedRpcChannel(NettyRpcClient rpcClient, Set addrs,
- User ticket, int rpcTimeout, int fanOutSize) {
- this.rpcClient = rpcClient;
- this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
- Preconditions.checkArgument(this.addrs.size() >= 1);
- // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
- // order, creating hot spots on the service end points.
- Collections.shuffle(this.addrs);
- this.ticket = ticket;
- this.rpcTimeout = rpcTimeout;
- // fanOutSize controls the number of hedged RPCs per batch.
- this.fanOutSize = fanOutSize;
- }
-
- private HBaseRpcController applyRpcTimeout(RpcController controller) {
- HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
- int rpcTimeoutToSet =
- hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
- HBaseRpcController response = new HBaseRpcControllerImpl();
- response.setCallTimeout(rpcTimeoutToSet);
- return response;
- }
-
- private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
- Message request, Message responsePrototype, RpcCallback done) {
- int i = 0;
- BatchRpcCtx lastBatchCtx = null;
- while (i < addrs.size()) {
- // Each iteration picks fanOutSize addresses to run as batch.
- int batchEnd = Math.min(addrs.size(), i + fanOutSize);
- List addrSubList = addrs.subList(i, batchEnd);
- BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
- lastBatchCtx = batchRpcCtx;
- LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
- for (InetSocketAddress address : addrSubList) {
- HBaseRpcController rpcController = applyRpcTimeout(controller);
- // ** WARN ** This is a blocking call if the underlying connection for the rpc client is
- // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
- // the write calls. Handling blocking connection means that this should be run in a separate
- // thread and hence more code complexity. Is it ok to handle only non-blocking connections?
- batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
- responsePrototype, ticket, address,
- new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
- }
- if (batchRpcCtx.waitForResults()) {
- return;
- }
- // Entire batch has failed, lets try the next batch.
- LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
- i = batchEnd;
- }
- Preconditions.checkNotNull(lastBatchCtx);
- // All the batches failed, mark it a failed rpc.
- // Propagate the failure reason. We propagate the last batch's last failing rpc reason.
- // Can we do something better?
- controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
- done.run(null);
- }
-
- @Override
- public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
- Message request, Message responsePrototype, RpcCallback done) {
- // There is no reason to use any other implementation of RpcController.
- Preconditions.checkState(controller instanceof HBaseRpcController);
- // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
- // called once the hedging finishes.
- CompletableFuture.runAsync(
- () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index f7a65e4d6fff..4c85e3d51abe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -18,20 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
@@ -82,19 +76,6 @@ protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOEx
return new NettyRpcConnection(this, remoteId);
}
- @Override
- public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout)
- throws UnknownHostException {
- final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
- HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
- Set addresses = new HashSet<>();
- for (ServerName sn: sns) {
- addresses.add(createAddr(sn));
- }
- return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
- hedgedRpcFanOut);
- }
-
@Override
protected void closeInternal() {
if (shutdownGroupWhenClose) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 558fceeb78bb..877d9b0d5b90 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -19,10 +19,10 @@
import java.io.Closeable;
import java.io.IOException;
-import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
@@ -82,16 +82,6 @@ BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTim
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
throws IOException;
- /**
- * Creates a channel that can hedge request to multiple underlying channels.
- * @param sns Set of servers for underlying channels.
- * @param user user for the connection.
- * @param rpcTimeout rpc timeout to use.
- * @return A hedging rpc channel for this rpc client instance.
- */
- RpcChannel createHedgedRpcChannel(final Set sns, final User user, int rpcTimeout)
- throws IOException;
-
/**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index d518d217ba3b..c1853500da77 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -185,13 +185,6 @@ public enum OperationStatusCode {
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";
- /** Configuration to enable hedged reads on master registry **/
- public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
- "hbase.client.master_registry.enable_hedged_reads";
-
- /** Default value for enabling hedging reads on master registry **/
- public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
-
/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@@ -917,12 +910,6 @@ public enum OperationStatusCode {
*/
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
- /** Configuration key that controls the fan out of requests in hedged channel implementation. **/
- public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
-
- /** Default value for the fan out of hedged requests. **/
- public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
-
/**
* timeout for each read RPC
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
index 4b699ab470b9..97563138db31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
@@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@@ -53,6 +54,7 @@
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
@@ -86,20 +88,18 @@ class FromClientSideBase {
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/
- protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+ protected static boolean isSameParameterizedCluster(Class> registryImpl, int numHedgedReqs) {
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
- Class confClass = conf.getClass(
- HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
- int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
- HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
- return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
+ Class> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ ZKConnectionRegistry.class);
+ return confClass.getName().equals(registryImpl.getName());
}
- protected static final void initialize(Class registryImpl, int numHedgedReqs, Class>... cps)
- throws Exception {
+ protected static final void initialize(Class> registryImpl, int numHedgedReqs, Class>... cps)
+ throws Exception {
// initialize() is called for every unit test, however we only want to reset the cluster state
// at the end of every parameterized run.
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
@@ -124,12 +124,11 @@ protected static final void initialize(Class registryImpl, int numHedgedReqs, Cl
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
ConnectionRegistry.class);
if (numHedgedReqs == 1) {
- conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+ conf.setBoolean(MasterRegistry.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Preconditions.checkArgument(numHedgedReqs > 1);
- conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+ conf.setBoolean(MasterRegistry.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
- conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
// Multiple masters needed only when hedged reads for master registry are enabled.
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index a7991c703550..71f5447cd728 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -19,11 +19,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
-import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +45,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -146,14 +146,6 @@ public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout)
throws UnknownHostException {
return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
}
-
- @Override
- public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout)
- throws UnknownHostException {
- Preconditions.checkArgument(sns != null && sns.size() == 1);
- return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
- }
-
}
private static AtomicInteger invokations = new AtomicInteger();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 65b2d0bc956e..aebe1845bf03 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -19,7 +19,8 @@
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
-import java.net.UnknownHostException;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -77,14 +78,15 @@ private static String generateDummyMastersList(int size) {
/**
* Makes sure the master registry parses the master end points in the configuration correctly.
*/
- @Test public void testMasterAddressParsing() throws UnknownHostException {
+ @Test
+ public void testMasterAddressParsing() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
List parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
// Half of them would be without a port, duplicates are removed.
- assertEquals(numMasters/2 + 1, parsedMasters.size());
+ assertEquals(numMasters / 2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
for (int i = 0; i < parsedMasters.size(); i++) {
@@ -100,18 +102,18 @@ private static String generateDummyMastersList(int size) {
}
}
- @Test public void testRegistryRPCs() throws Exception {
+ @Test
+ public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
- final int size = activeMaster.getMetaRegionLocationCache().
- getMetaRegionLocations().get().size();
- for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) {
+ final int size =
+ activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
+ for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
if (numHedgedReqs == 1) {
- conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+ conf.setBoolean(MasterRegistry.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
- conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+ conf.setBoolean(MasterRegistry.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
- conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
try (MasterRegistry registry = new MasterRegistry(conf)) {
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
@@ -119,9 +121,9 @@ private static String generateDummyMastersList(int size) {
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List metaLocations =
- Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
- List actualMetaLocations = activeMaster.getMetaRegionLocationCache()
- .getMetaRegionLocations().get();
+ Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+ List actualMetaLocations =
+ activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index c045c606b785..6c976c0c992a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -101,7 +101,7 @@ public static void tearDownAfterClass() throws Exception {
}
@Parameterized.Parameters
- public static Collection parameters() {
+ public static Collection