Skip to content

Commit

Permalink
address review comments, and tests for hedged reads
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed May 5, 2020
1 parent 6492757 commit 1055818
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -48,7 +50,7 @@
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
Expand All @@ -66,10 +68,9 @@
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
* <p/>
* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
* It supports hedged reads, set the fan out of the requests batch by
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
Expand All @@ -88,9 +89,7 @@ public class MasterRegistry implements ConnectionRegistry {
private final int hedgedReadFanOut;

// Configured list of masters to probe the meta information from.
private final Set<ServerName> masterAddrs;

private final List<ClientMetaService.Interface> masterStubs;
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
Expand All @@ -114,29 +113,32 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
}

MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
masterAddrs = parseMasterAddrs(conf);
ImmutableList.Builder<ClientMetaService.Interface> builder =
ImmutableList.builderWithExpectedSize(masterAddrs.size());
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
builder
.add(ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterStubs = builder.build();
masterAddr2Stub = builder.build();
}

/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
* Will be called in {@code HBaseTestingUtility}.
*/
@VisibleForTesting
public static String getMasterAddr(Configuration conf) throws UnknownHostException {
String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
Expand All @@ -147,6 +149,15 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}

/**
* For describing the actual asynchronous rpc call.
* <p/>
* Typically, you can use lambda expression to implement this interface as
*
* <pre>
* (c, s, d) -> s.xxx(c, <your request here>, d)
* </pre>
*/
@FunctionalInterface
private interface Callable<T> {
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
Expand All @@ -170,10 +181,13 @@ private IOException badResponse(String debug) {
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
}

// send requests concurrently to hedgedReadsFanout masters
private <T extends Message> void groupCall(CompletableFuture<T> future, int startIndexInclusive,
Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
// send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
// will complete the future and quit. If all the requests in one round are failed, we will start
// another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
// been tried and all of them are failed, we will fail the future.
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
Expand All @@ -193,9 +207,11 @@ private <T extends Message> void groupCall(CompletableFuture<T> future, int star
// we are done, complete the future with exception
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(new MasterRegistryFetchException(masterAddrs, ex));
future.completeExceptionally(
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
} else {
groupCall(future, endIndexExclusive, callable, isValidResp, debug, errors);
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
}
}
} else {
Expand All @@ -208,9 +224,10 @@ private <T extends Message> void groupCall(CompletableFuture<T> future, int star

private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
groupCall(future, 0, callable, isValidResp, debug, errors);
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
return future;
}

Expand Down Expand Up @@ -255,7 +272,7 @@ public CompletableFuture<ServerName> getActiveMaster() {

@VisibleForTesting
Set<ServerName> getParsedMasterServers() {
return masterAddrs;
return masterAddr2Stub.keySet();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {

private static final long serialVersionUID = 6992134872168185171L;

public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;

@Category({ ClientTests.class, SmallTests.class })
public class TestMasterRegistryHedgedReads {

private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);

private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();

private static final ExecutorService EXECUTOR =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());

private static AtomicInteger CALLED = new AtomicInteger(0);

private static volatile int BAD_RESP_INDEX;

private static volatile Set<Integer> GOOD_RESP_INDEXS;

private static GetClusterIdResponse RESP =
GetClusterIdResponse.newBuilder().setClusterId("id").build();

public static final class RpcClientImpl implements RpcClient {

public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
}

@Override
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
throws IOException {
return new RpcChannelImpl();
}

@Override
public void cancelConnections(ServerName sn) {
}

@Override
public void close() {
}

@Override
public boolean hasCellBlockSupport() {
return false;
}
}

public static final class RpcChannelImpl implements RpcChannel {

@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
EXECUTOR.execute(() -> {
int index = CALLED.getAndIncrement();
if (index == BAD_RESP_INDEX) {
done.run(GetClusterIdResponse.getDefaultInstance());
} else if (GOOD_RESP_INDEXS.contains(index)) {
done.run(RESP);
} else {
((HBaseRpcController) controller).setFailed("inject error");
done.run(null);
}
});
}
}

@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
.collect(Collectors.joining(","));
conf.set(HConstants.MASTER_ADDRS_KEY, masters);
}

@AfterClass
public static void tearDownAfterClass() {
EXECUTOR.shutdownNow();
}

@Before
public void setUp() {
CALLED.set(0);
BAD_RESP_INDEX = -1;
GOOD_RESP_INDEXS = Collections.emptySet();
}

private <T> T logIfError(CompletableFuture<T> future) throws IOException {
try {
return FutureUtils.get(future);
} catch (Throwable t) {
LOG.warn("", t);
throw t;
}
}

@Test
public void testAllFailNoHedged() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
}

@Test
public void testAllFailHedged3() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
BAD_RESP_INDEX = 5;
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
}

@Test
public void testFirstSucceededNoHedge() throws IOException {
Configuration conf = UTIL.getConfiguration();
// will be set to 1
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
GOOD_RESP_INDEXS =
IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
assertEquals(1, CALLED.get());
}
}

@Test
public void testSecondRoundSucceededHedge4() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
GOOD_RESP_INDEXS = Collections.singleton(6);
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 8);
}
}

@Test
public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
GOOD_RESP_INDEXS = Collections.singleton(5);
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 10);
Thread.sleep(1000);
// make sure we do not send more
assertEquals(10, CALLED.get());
}
}
}

0 comments on commit 1055818

Please sign in to comment.