diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index b0472a0f1b47..d51eb0ab9ad7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
@@ -257,10 +256,6 @@ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOExce
() -> createRegionServerStub(serverName));
}
- private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
- return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
- }
-
private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
@@ -272,26 +267,8 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
}
CompletableFuture getMasterStub() {
- return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
- CompletableFuture future = new CompletableFuture<>();
- addListener(registry.getActiveMaster(), (addr, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else if (addr == null) {
- future.completeExceptionally(new MasterNotRunningException(
- "ZooKeeper available but no active master location found"));
- } else {
- LOG.debug("The fetched master address is {}", addr);
- try {
- future.complete(createMasterStub(addr));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
-
- });
- return future;
- }, stub -> true, "master stub");
+ return ConnectionUtils.getMasterStub(registry, masterStub, masterStubMakeFuture, rpcClient,
+ user, rpcTimeout, TimeUnit.MILLISECONDS, MasterService::newStub, "MasterService");
}
String getClusterId() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
index c8ffa2418319..a1ccb8c04fa0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
@@ -19,32 +19,12 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaCellComparator;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
/**
* The class for locating region for meta table.
@@ -52,104 +32,27 @@
@InterfaceAudience.Private
class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
- private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);
-
- private final AtomicReference stub = new AtomicReference<>();
-
- private final AtomicReference> stubMakeFuture =
- new AtomicReference<>();
-
AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
// for meta region we should use MetaCellComparator to compare the row keys
- super(conn, tableName, maxConcurrent, (r1, r2) -> MetaCellComparator
- .compareRows(r1, 0, r1.length, r2, 0, r2.length));
- }
-
- private Interface createStub(ServerName serverName) throws IOException {
- return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
- (int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
- }
-
- CompletableFuture getStub() {
- return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
- CompletableFuture future = new CompletableFuture<>();
- addListener(conn.registry.getActiveMaster(), (addr, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else if (addr == null) {
- future.completeExceptionally(new MasterNotRunningException(
- "ZooKeeper available but no active master location found"));
- } else {
- LOG.debug("The fetched master address is {}", addr);
- try {
- future.complete(createStub(addr));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
-
- });
- return future;
- }, stub -> true, "ClientLocateMetaStub");
- }
-
- private void tryClearMasterStubCache(IOException error, Interface currentStub) {
- if (ClientExceptionsUtil.isConnectionException(error) ||
- error instanceof ServerNotRunningYetException) {
- stub.compareAndSet(currentStub, null);
- }
+ super(conn, tableName, maxConcurrent, MetaCellComparator.ROW_COMPARATOR);
}
@Override
protected void locate(LocateRequest req) {
- addListener(getStub(), (stub, error) -> {
+ addListener(conn.registry.locateMeta(req.row, req.locateType), (locs, error) -> {
if (error != null) {
onLocateComplete(req, null, error);
return;
}
- HBaseRpcController controller = conn.rpcControllerFactory.newController();
- stub.locateMetaRegion(controller,
- LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
- .setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
- resp -> {
- if (controller.failed()) {
- IOException ex = controller.getFailed();
- tryClearMasterStubCache(ex, stub);
- onLocateComplete(req, null, ex);
- return;
- }
- RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
- .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
- if (validateRegionLocations(locs, req)) {
- onLocateComplete(req, locs, null);
- }
- });
+ if (validateRegionLocations(locs, req)) {
+ onLocateComplete(req, locs, null);
+ }
});
}
@Override
CompletableFuture>
getAllRegionLocations(boolean excludeOfflinedSplitParents) {
- CompletableFuture> future = new CompletableFuture<>();
- addListener(getStub(), (stub, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- HBaseRpcController controller = conn.rpcControllerFactory.newController();
- stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
- .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
- if (controller.failed()) {
- IOException ex = controller.getFailed();
- tryClearMasterStubCache(ex, stub);
- future.completeExceptionally(ex);
- return;
- }
- List locs = resp.getMetaLocationsList().stream()
- .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
- future.complete(locs);
- });
- });
- return future;
+ return conn.registry.getAllMetaRegionLocations(excludeOfflinedSplitParents);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index 569d728fc700..12222681aa84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -18,21 +18,36 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
- * hold cluster information such as this cluster's id, location of hbase:meta, etc..
- * Internal use only.
+ * hold cluster information such as this cluster's id, location of hbase:meta, etc.. Internal use
+ * only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
+ /**
+ * Get location of meta region for the given {@code row}.
+ */
+ CompletableFuture locateMeta(byte[] row, RegionLocateType locateType);
+
+ /**
+ * Get all meta region locations, including the location of secondary regions.
+ * @param excludeOfflinedSplitParents whether to include split parent.
+ */
+ CompletableFuture>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents);
+
/**
* Should only be called once.
- *
+ *
* The upper layer should store this value somewhere as it will not be change any more.
*/
CompletableFuture getClusterId();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 94ab7fac3fd8..86d6442f8fb7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -28,6 +28,8 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -35,20 +37,26 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
@@ -59,6 +67,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
@@ -68,6 +77,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
/**
* Utility used by client connections.
@@ -114,7 +125,7 @@ public static long getPauseTime(final long pause, final int tries) {
* @param log Used to log what we set in here.
*/
public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn,
- final Logger log) {
+ final Logger log) {
// TODO: Fix this. Not all connections from server side should have 10 times the retries.
int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -194,11 +205,18 @@ static boolean isEmptyStopRow(byte[] row) {
return Bytes.equals(row, EMPTY_END_ROW);
}
+ private static int nanosToMillis(long nanos) {
+ return toIntNoOverflow(TimeUnit.NANOSECONDS.toMillis(nanos));
+ }
+
+ private static int toIntNoOverflow(long value) {
+ return (int) Math.min(Integer.MAX_VALUE, value);
+ }
+
static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
controller.reset();
if (timeoutNs >= 0) {
- controller.setCallTimeout(
- (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
+ controller.setCallTimeout(nanosToMillis(timeoutNs));
}
controller.setPriority(priority);
}
@@ -355,7 +373,7 @@ static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServer
}
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
- boolean isRegionServerRemote) {
+ boolean isRegionServerRemote) {
if (scanMetrics == null || rrs == null || rrs.length == 0) {
return;
}
@@ -398,7 +416,7 @@ static void incRegionCountMetrics(ScanMetrics scanMetrics) {
* increase the hedge read related metrics.
*/
private static void connect(CompletableFuture srcFuture, CompletableFuture dstFuture,
- Optional metrics) {
+ Optional metrics) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
@@ -417,8 +435,8 @@ private static void connect(CompletableFuture srcFuture, CompletableFutur
}
private static void sendRequestsToSecondaryReplicas(
- Function> requestReplica, RegionLocations locs,
- CompletableFuture future, Optional metrics) {
+ Function> requestReplica, RegionLocations locs,
+ CompletableFuture future, Optional metrics) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
@@ -432,9 +450,9 @@ private static void sendRequestsToSecondaryReplicas(
}
static CompletableFuture timelineConsistentRead(AsyncRegionLocator locator,
- TableName tableName, Query query, byte[] row, RegionLocateType locateType,
- Function> requestReplica, long rpcTimeoutNs,
- long primaryCallTimeoutNs, Timer retryTimer, Optional metrics) {
+ TableName tableName, Query query, byte[] row, RegionLocateType locateType,
+ Function> requestReplica, long rpcTimeoutNs,
+ long primaryCallTimeoutNs, Timer retryTimer, Optional metrics) {
if (query.getConsistency() != Consistency.TIMELINE) {
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@@ -521,52 +539,8 @@ static int getPriority(TableName tableName) {
}
}
- static CompletableFuture getOrFetch(AtomicReference cacheRef,
- AtomicReference> futureRef, boolean reload,
- Supplier> fetch, Predicate validator, String type) {
- for (;;) {
- if (!reload) {
- T value = cacheRef.get();
- if (value != null && validator.test(value)) {
- return CompletableFuture.completedFuture(value);
- }
- }
- LOG.trace("{} cache is null, try fetching from registry", type);
- if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
- LOG.debug("Start fetching {} from registry", type);
- CompletableFuture future = futureRef.get();
- addListener(fetch.get(), (value, error) -> {
- if (error != null) {
- LOG.debug("Failed to fetch {} from registry", type, error);
- futureRef.getAndSet(null).completeExceptionally(error);
- return;
- }
- LOG.debug("The fetched {} is {}", type, value);
- // Here we update cache before reset future, so it is possible that someone can get a
- // stale value. Consider this:
- // 1. update cacheRef
- // 2. someone clears the cache and relocates again
- // 3. the futureRef is not null so the old future is used.
- // 4. we clear futureRef and complete the future in it with the value being
- // cleared in step 2.
- // But we do not think it is a big deal as it rarely happens, and even if it happens, the
- // caller will retry again later, no correctness problems.
- cacheRef.set(value);
- futureRef.set(null);
- future.complete(value);
- });
- return future;
- } else {
- CompletableFuture future = futureRef.get();
- if (future != null) {
- return future;
- }
- }
- }
- }
-
static void updateStats(Optional optStats,
- Optional optMetrics, ServerName serverName, MultiResponse resp) {
+ Optional optMetrics, ServerName serverName, MultiResponse resp) {
if (!optStats.isPresent() && !optMetrics.isPresent()) {
// ServerStatisticTracker and MetricsConnection are both not present, just return
return;
@@ -594,13 +568,13 @@ interface Converter {
@FunctionalInterface
interface RpcCall {
void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback done);
+ RpcCallback done);
}
static CompletableFuture call(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub, REQ req,
- Converter reqConvert, RpcCall rpcCall,
- Converter respConverter) {
+ HRegionLocation loc, ClientService.Interface stub, REQ req,
+ Converter reqConvert, RpcCall rpcCall,
+ Converter respConverter) {
CompletableFuture future = new CompletableFuture<>();
try {
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, Throwable error) {
controller.setFailed(error.toString());
}
}
+
+ public static RegionLocations locateRow(NavigableMap cache,
+ TableName tableName, byte[] row, int replicaId) {
+ Map.Entry entry = cache.floorEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
+ byte[] endKey = loc.getRegion().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+ }
+ return locs;
+ } else {
+ return null;
+ }
+ }
+
+ public static RegionLocations locateRowBefore(NavigableMap cache,
+ TableName tableName, byte[] row, int replicaId) {
+ boolean isEmptyStopRow = isEmptyStopRow(row);
+ Map.Entry entry =
+ isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
+ if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+ (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+ }
+ return locs;
+ } else {
+ return null;
+ }
+ }
+
+ public static void tryClearMasterStubCache(IOException error,
+ ClientMetaService.Interface currentStub, AtomicReference stub) {
+ if (ClientExceptionsUtil.isConnectionException(error) ||
+ error instanceof ServerNotRunningYetException) {
+ stub.compareAndSet(currentStub, null);
+ }
+ }
+
+ public static CompletableFuture getMasterStub(ConnectionRegistry registry,
+ AtomicReference stub, AtomicReference> stubMakeFuture,
+ RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+ Function stubMaker, String type) {
+ return getOrFetch(stub, stubMakeFuture, () -> {
+ CompletableFuture future = new CompletableFuture<>();
+ addListener(registry.getActiveMaster(), (addr, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else if (addr == null) {
+ future.completeExceptionally(new MasterNotRunningException(
+ "ZooKeeper available but no active master location found"));
+ } else {
+ LOG.debug("The fetched master address is {}", addr);
+ try {
+ future.complete(stubMaker.apply(
+ rpcClient.createRpcChannel(addr, user, toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ });
+ return future;
+ }, type);
+ }
+
+ private static CompletableFuture getOrFetch(AtomicReference cachedRef,
+ AtomicReference> futureRef, Supplier> fetch,
+ String type) {
+ for (;;) {
+ T cachedValue = cachedRef.get();
+ if (cachedValue != null) {
+ return CompletableFuture.completedFuture(cachedValue);
+ }
+ LOG.trace("{} cache is null, try fetching from registry", type);
+ if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+ LOG.debug("Start fetching {} from registry", type);
+ CompletableFuture future = futureRef.get();
+ addListener(fetch.get(), (value, error) -> {
+ if (error != null) {
+ LOG.debug("Failed to fetch {} from registry", type, error);
+ futureRef.getAndSet(null).completeExceptionally(error);
+ return;
+ }
+ LOG.debug("The fetched {} is {}", type, value);
+ // Here we update cache before reset future, so it is possible that someone can get a
+ // stale value. Consider this:
+ // 1. update cacheRef
+ // 2. someone clears the cache and relocates again
+ // 3. the futureRef is not null so the old future is used.
+ // 4. we clear futureRef and complete the future in it with the value being
+ // cleared in step 2.
+ // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+ // caller will retry again later, no correctness problems.
+ cachedRef.set(value);
+ futureRef.set(null);
+ future.complete(value);
+ });
+ return future;
+ } else {
+ CompletableFuture future = futureRef.get();
+ if (future != null) {
+ return future;
+ }
+ }
+ }
+ }
+
+ public static CompletableFuture> getAllMetaRegionLocations(
+ boolean excludeOfflinedSplitParents,
+ CompletableFuture getStubFuture,
+ AtomicReference stubRef,
+ RpcControllerFactory rpcControllerFactory, int callTimeoutMs) {
+ CompletableFuture> future = new CompletableFuture<>();
+ addListener(getStubFuture, (stub, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ if (callTimeoutMs > 0) {
+ controller.setCallTimeout(callTimeoutMs);
+ }
+ stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
+ .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ tryClearMasterStubCache(ex, stub, stubRef);
+ future.completeExceptionally(ex);
+ return;
+ }
+ List locs = resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
+ future.complete(locs);
+ });
+ });
+ return future;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 0f70d0fdd8f9..d760cf03dde9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -54,11 +54,14 @@
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
@@ -66,6 +69,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
@@ -124,7 +129,7 @@ private static Set parseMasterAddrs(Configuration conf) throws Unkno
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
- // this through the master registry...
+ // this through the connection registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
@@ -345,4 +350,36 @@ public void close() {
rpcClient.close();
}
}
-}
+
+ private RegionLocations transformRegionLocations(LocateMetaRegionResponse resp) {
+ return new RegionLocations(resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+ }
+
+ @Override
+ public CompletableFuture locateMeta(byte[] row, RegionLocateType locateType) {
+ LocateMetaRegionRequest request =
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(row))
+ .setLocateType(ProtobufUtil.toProtoRegionLocateType(locateType)).build();
+ return this. call((c, s, d) -> s.locateMetaRegion(c, request, d),
+ r -> true, "locateMeta()").thenApply(this::transformRegionLocations);
+ }
+
+ private List
+ transformRegionLocationList(GetAllMetaRegionLocationsResponse resp) {
+ return resp.getMetaLocationsList().stream().map(ProtobufUtil::toRegionLocation)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CompletableFuture>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ GetAllMetaRegionLocationsRequest request = GetAllMetaRegionLocationsRequest.newBuilder()
+ .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build();
+ return this
+ . call(
+ (c, s, d) -> s.getAllMetaRegionLocations(c, request, d), r -> true,
+ "getAllMetaRegionLocations(" + excludeOfflinedSplitParents + ")")
+ .thenApply(this::transformRegionLocationList);
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
index 1f0754b7de2d..41f203eb34a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
@@ -20,7 +20,8 @@
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isEqual;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
import java.util.Comparator;
import java.util.Map;
@@ -67,64 +68,17 @@ private void recordClearRegionCache() {
metrics.ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
}
- private RegionLocations locateRow(TableName tableName, byte[] row, int replicaId) {
- Map.Entry entry = cache.floorEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- byte[] endKey = loc.getRegion().getEndKey();
- if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
- }
- recordCacheHit();
- return locs;
- } else {
- recordCacheMiss();
- return null;
- }
- }
-
- private RegionLocations locateRowBefore(TableName tableName, byte[] row, int replicaId) {
- boolean isEmptyStopRow = isEmptyStopRow(row);
- Map.Entry entry =
- isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
- (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
- }
+ RegionLocations locate(TableName tableName, byte[] row, int replicaId,
+ RegionLocateType locateType) {
+ RegionLocations locs = locateType.equals(RegionLocateType.BEFORE) ?
+ locateRowBefore(cache, tableName, row, replicaId) :
+ locateRow(cache, tableName, row, replicaId);
+ if (locs != null) {
recordCacheHit();
- return locs;
} else {
recordCacheMiss();
- return null;
}
- }
-
- RegionLocations locate(TableName tableName, byte[] row, int replicaId,
- RegionLocateType locateType) {
- return locateType.equals(RegionLocateType.BEFORE) ? locateRowBefore(tableName, row, replicaId) :
- locateRow(tableName, row, replicaId);
+ return locs;
}
// if we successfully add the locations to cache, return the locations, otherwise return the one
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index e8acbda23ac3..a10ef1819952 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
@@ -25,6 +28,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +39,12 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -43,8 +53,12 @@
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
/**
@@ -59,9 +73,29 @@ class ZKConnectionRegistry implements ConnectionRegistry {
private final ZNodePaths znodePaths;
- ZKConnectionRegistry(Configuration conf) {
+ private final AtomicReference cachedStub = new AtomicReference<>();
+
+ private final AtomicReference> stubMakeFuture =
+ new AtomicReference<>();
+
+ // RPC client used to talk to the masters.
+ private final RpcClient rpcClient;
+ private final RpcControllerFactory rpcControllerFactory;
+ private final long readRpcTimeoutNs;
+ private final User user;
+
+ ZKConnectionRegistry(Configuration conf) throws IOException {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
+ // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
+ // this through the connection registry...
+ // This is a problem as we will use the cluster id to determine the authentication method
+ rpcClient = RpcClientFactory.createClient(conf, null);
+ rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+ long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
+ this.readRpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs));
+ this.user = User.getCurrent();
}
private interface Converter {
@@ -229,8 +263,48 @@ public CompletableFuture getActiveMaster() {
});
}
+ private CompletableFuture getStub() {
+ return ConnectionUtils.getMasterStub(this, cachedStub, stubMakeFuture, rpcClient, user,
+ readRpcTimeoutNs, TimeUnit.NANOSECONDS, ClientMetaService::newStub, "ClientMetaService");
+ }
+
+ @Override
+ public CompletableFuture locateMeta(byte[] row, RegionLocateType locateType) {
+ CompletableFuture future = new CompletableFuture<>();
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ stub.locateMetaRegion(controller,
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(row))
+ .setLocateType(ProtobufUtil.toProtoRegionLocateType(locateType)).build(),
+ resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ ConnectionUtils.tryClearMasterStubCache(ex, stub, ZKConnectionRegistry.this.cachedStub);
+ future.completeExceptionally(ex);
+ return;
+ }
+ RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+ future.complete(locs);
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ return ConnectionUtils.getAllMetaRegionLocations(excludeOfflinedSplitParents, getStub(),
+ cachedStub, rpcControllerFactory, -1);
+ }
+
@Override
public void close() {
+ rpcClient.close();
zk.close();
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 64ded7f630c1..68e9dd5a6b4c 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -44,4 +48,15 @@ public CompletableFuture getActiveMaster() {
@Override
public void close() {
}
+
+ @Override
+ public CompletableFuture locateMeta(byte[] row, RegionLocateType type) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
index 1620d9feb05e..0b984cda351a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
@@ -36,6 +36,9 @@
@InterfaceStability.Evolving
public class MetaCellComparator extends CellComparatorImpl {
+ public static final Comparator ROW_COMPARATOR =
+ (r1, r2) -> compareRows(r1, 0, r1.length, r2, 0, r2.length);
+
/**
* A {@link MetaCellComparator} for hbase:meta
catalog table
* {@link KeyValue}s.
@@ -71,7 +74,7 @@ public int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
return ignoreSequenceid ? diff : Longs.compare(b.getSequenceId(), a.getSequenceId());
}
- public static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
+ private static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
int rlength) {
int leftDelimiter = Bytes.searchDelimiterIndex(left, loffset, llength, HConstants.DELIMITER);
int rightDelimiter = Bytes.searchDelimiterIndex(right, roffset, rlength, HConstants.DELIMITER);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac444f3..8e64b4bed3cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -61,13 +62,13 @@ public interface AsyncClusterConnection extends AsyncConnection {
* original return value is useless.
*/
CompletableFuture replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List entries, int replicaId, int numRetries, long operationTimeoutNs);
+ List entries, int replicaId, int numRetries, long operationTimeoutNs);
/**
* Return all the replicas for a region. Used for region replica replication.
*/
CompletableFuture getRegionLocations(TableName tableName, byte[] row,
- boolean reload);
+ boolean reload);
/**
* Return the token for this bulk load.
@@ -98,4 +99,9 @@ CompletableFuture bulkLoad(TableName tableName, List cleanupBulkLoad(TableName tableName, String bulkToken);
+
+ /**
+ * Fetch all meta region locations from active master, used by backup masters for caching.
+ */
+ CompletableFuture> getAllMetaRegionLocations(int callTimeoutMs);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a28e0c7..cfe62db5eeaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -20,8 +20,11 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -43,6 +46,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
/**
* The implementation of AsyncClusterConnection.
@@ -50,8 +54,14 @@
@InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
+ private final AtomicReference cachedClientMetaStub =
+ new AtomicReference<>();
+
+ private final AtomicReference>
+ clientMetaStubMakeFuture = new AtomicReference<>();
+
public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry,
- String clusterId, SocketAddress localAddress, User user) {
+ String clusterId, SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user);
}
@@ -72,14 +82,14 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
@Override
public CompletableFuture flush(byte[] regionName,
- boolean writeFlushWALMarker) {
+ boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, null, writeFlushWALMarker);
}
@Override
public CompletableFuture replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List entries, int replicaId, int retries, long operationTimeoutNs) {
+ List entries, int replicaId, int retries, long operationTimeoutNs) {
return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
row, entries, replicaId).call();
@@ -87,7 +97,7 @@ public CompletableFuture replay(TableName tableName, byte[] encodedRegionN
@Override
public CompletableFuture getRegionLocations(TableName tableName, byte[] row,
- boolean reload) {
+ boolean reload) {
return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
}
@@ -132,4 +142,16 @@ public CompletableFuture cleanupBulkLoad(TableName tableName, String bulkT
}, (s, c, req, done) -> s.cleanupBulkLoad(c, req, done), (c, resp) -> null))
.call();
}
+
+ private CompletableFuture getClientMetaStub() {
+ return ConnectionUtils.getMasterStub(registry, cachedClientMetaStub, clientMetaStubMakeFuture,
+ rpcClient, user, rpcTimeout, TimeUnit.MILLISECONDS, ClientMetaService::newStub,
+ "ClientMetaService");
+ }
+
+ @Override
+ public CompletableFuture> getAllMetaRegionLocations(int callTimeoutMs) {
+ return ConnectionUtils.getAllMetaRegionLocations(false, getClientMetaStub(),
+ cachedClientMetaStub, rpcControllerFactory, callTimeoutMs);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 538a718f5c46..793a4739079a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -238,6 +238,7 @@
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@@ -380,12 +381,11 @@ public void run() {
// manager of assignment nodes in zookeeper
private AssignmentManager assignmentManager;
-
/**
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale
* cache entries.
*/
- private final MetaRegionLocationCache metaRegionLocationCache;
+ private volatile MetaLocationCache metaLocationCache;
private RSGroupInfoManager rsGroupInfoManager;
@@ -602,7 +602,7 @@ public HMaster(final Configuration conf) throws IOException {
}
}
- this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
+ this.metaLocationCache = new MetaLocationCache(this);
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
cachedClusterId = new CachedClusterId(this, conf);
@@ -633,6 +633,25 @@ protected String getUseThisHostnameInstead(Configuration conf) {
@Override
public void run() {
try {
+ // we have to do this in a background thread as for a fresh new cluster, we need to become
+ // active master first to set the cluster id so we can initialize the cluster connection.
+ // for backup master, we need to use async cluster connection to connect to active master for
+ // fetching the content of root table, to serve the locate meta requests from client.
+ Threads.setDaemonThreadRunning(new Thread(() -> {
+ for (;;) {
+ try {
+ if (!Strings.isNullOrEmpty(ZKClusterId.readClusterIdZNode(zooKeeper))) {
+ setupClusterConnection();
+ break;
+ } else {
+ LOG.trace("cluster id is still null, waiting...");
+ }
+ } catch (Throwable t) {
+ LOG.warn("failed to initialize cluster connection, retrying...");
+ }
+ Threads.sleep(1000);
+ }
+ }), getName() + ":initClusterConnection");
Threads.setDaemonThreadRunning(new Thread(() -> {
try {
int infoPort = putUpJettyServer();
@@ -993,9 +1012,7 @@ private void tryMigrateRootTableFromZooKeeper() throws IOException, KeeperExcept
*/
private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
InterruptedException, KeeperException, ReplicationException {
- /*
- * We are active master now... go initialize components we need to run.
- */
+ // We are active master now... go initialize components we need to run.
status.setStatus("Initializing Master file system");
this.masterActiveTime = System.currentTimeMillis();
@@ -1039,6 +1056,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
// The below two managers must be created before loading procedures, as they will be used during
// loading.
+ setupClusterConnection();
this.serverManager = createServerManager(this);
this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
@@ -1051,6 +1069,10 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
tryMigrateRootTableFromZooKeeper();
+ // stop meta location cache, as now we do not need sync from active master any more.
+ metaLocationCache.stop("we are active master now");
+ metaLocationCache = null;
+
createProcedureExecutor();
Map, List>> procsByType =
procedureExecutor.getActiveProceduresNoCopy().stream()
@@ -1412,18 +1434,12 @@ protected MasterMetaBootstrap createMetaBootstrap() {
}
/**
- *
* Create a {@link ServerManager} instance.
- *
- *
+ *
* Will be overridden in tests.
- *
*/
@VisibleForTesting
protected ServerManager createServerManager(final MasterServices master) throws IOException {
- // We put this out here in a method so can do a Mockito.spy and stub it out
- // w/ a mocked up ServerManager.
- setupClusterConnection();
return new ServerManager(master);
}
@@ -2325,16 +2341,14 @@ public long createSystemTable(final TableDescriptor tableDescriptor) throws IOEx
private void startActiveMasterManager(int infoPort) throws KeeperException {
String backupZNode = ZNodePaths.joinZNode(
zooKeeper.getZNodePaths().backupMasterAddressesZNode, serverName.toString());
- /*
- * Add a ZNode for ourselves in the backup master directory since we
- * may not become the active master. If so, we want the actual active
- * master to know we are backup masters, so that it won't assign
- * regions to us if so configured.
- *
- * If we become the active master later, ActiveMasterManager will delete
- * this node explicitly. If we crash before then, ZooKeeper will delete
- * this node for us since it is ephemeral.
- */
+ /**
+ * Add a ZNode for ourselves in the backup master directory since we may not become the active
+ * master. If so, we want the actual active master to know we are backup masters, so that it
+ * won't assign regions to us if so configured.
+ *
+ * If we become the active master later, ActiveMasterManager will delete this node explicitly.
+ * If we crash before then, ZooKeeper will delete this node for us since it is ephemeral.
+ */
LOG.info("Adding backup master ZNode " + backupZNode);
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
LOG.warn("Failed create of " + backupZNode + " by " + serverName);
@@ -3973,8 +3987,8 @@ public void runReplicationBarrierCleaner() {
}
}
- public MetaRegionLocationCache getMetaRegionLocationCache() {
- return this.metaRegionLocationCache;
+ public MetaLocationCache getMetaLocationCache() {
+ return this.metaLocationCache;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index b9b4f64f048b..fbda2464ccff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -2981,13 +2981,27 @@ public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequ
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
- GetMetaRegionLocationsRequest request) throws ServiceException {
- GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
- Optional> metaLocations =
- master.getMetaRegionLocationCache().getMetaRegionLocations();
- metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
- location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
- return response.build();
+ GetMetaRegionLocationsRequest request) throws ServiceException {
+ MetaLocationCache cache = master.getMetaLocationCache();
+ RegionLocations locs;
+ try {
+ if (cache != null) {
+ locs = cache.locateMeta(HConstants.EMPTY_BYTE_ARRAY, RegionLocateType.CURRENT);
+ } else {
+ locs = master.locateMeta(HConstants.EMPTY_BYTE_ARRAY, RegionLocateType.CURRENT);
+ }
+ GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
+ if (locs != null) {
+ for (HRegionLocation loc : locs) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ }
+ }
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
@Override
@@ -3374,11 +3388,16 @@ public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
byte[] row = request.getRow().toByteArray();
RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
try {
- master.checkServiceStarted();
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preLocateMetaRegion(row, locateType);
}
- RegionLocations locs = master.locateMeta(row, locateType);
+ MetaLocationCache cache = master.getMetaLocationCache();
+ RegionLocations locs;
+ if (cache != null) {
+ locs = cache.locateMeta(row, locateType);
+ } else {
+ locs = master.locateMeta(row, locateType);
+ }
List list = new ArrayList<>();
LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
if (locs != null) {
@@ -3403,24 +3422,31 @@ public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController
GetAllMetaRegionLocationsRequest request) throws ServiceException {
boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
try {
- master.checkServiceStarted();
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
}
- List locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
- List list = new ArrayList<>();
- GetAllMetaRegionLocationsResponse.Builder builder =
- GetAllMetaRegionLocationsResponse.newBuilder();
- if (locs != null) {
- for (RegionLocations ls : locs) {
- for (HRegionLocation loc : ls) {
- if (loc != null) {
- builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
- list.add(loc);
+ MetaLocationCache cache = master.getMetaLocationCache();
+ List list;
+ if (cache != null) {
+ list = cache.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+ } else {
+ List locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+ list = new ArrayList<>();
+ if (locs != null) {
+ for (RegionLocations ls : locs) {
+ for (HRegionLocation loc : ls) {
+ if (loc != null) {
+ list.add(loc);
+ }
}
}
}
}
+ GetAllMetaRegionLocationsResponse.Builder builder =
+ GetAllMetaRegionLocationsResponse.newBuilder();
+ for (HRegionLocation loc : list) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ }
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postGetAllMetaRegionLocations(excludeOfflinedSplitParents,
list);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
new file mode 100644
index 000000000000..0f5cf110a5ab
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaCellComparator;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A cache of meta region locations.
+ */
+@InterfaceAudience.Private
+class MetaLocationCache implements Stoppable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetaLocationCache.class);
+
+ @VisibleForTesting
+ static final String SYNC_INTERVAL_SECONDS =
+ "hbase.master.meta-location-cache.sync-interval-seconds";
+
+ // default sync every 1 second.
+ @VisibleForTesting
+ static final int DEFAULT_SYNC_INTERVAL_SECONDS = 1;
+
+ private static final String FETCH_TIMEOUT_MS =
+ "hbase.master.meta-location-cache.fetch-timeout-ms";
+
+ // default timeout 1 second
+ private static final int DEFAULT_FETCH_TIMEOUT_MS = 1000;
+
+ private static final class CacheHolder {
+
+ final NavigableMap cache;
+
+ final List all;
+
+ CacheHolder(List all) {
+ this.all = Collections.unmodifiableList(all);
+ NavigableMap> startKeyToLocs =
+ new TreeMap<>(MetaCellComparator.ROW_COMPARATOR);
+ for (HRegionLocation loc : all) {
+ if (loc.getRegion().isSplitParent()) {
+ continue;
+ }
+ startKeyToLocs.computeIfAbsent(loc.getRegion().getStartKey(),
+ k -> new TreeSet<>((l1, l2) -> l1.getRegion().compareTo(l2.getRegion()))).add(loc);
+ }
+ this.cache = startKeyToLocs.entrySet().stream().collect(Collectors.collectingAndThen(
+ Collectors.toMap(Map.Entry::getKey, e -> new RegionLocations(e.getValue()), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(MetaCellComparator.ROW_COMPARATOR)),
+ Collections::unmodifiableNavigableMap));
+ }
+ }
+
+ private volatile CacheHolder holder;
+
+ private volatile boolean stopped = false;
+
+ MetaLocationCache(MasterServices master) {
+ int syncIntervalSeconds =
+ master.getConfiguration().getInt(SYNC_INTERVAL_SECONDS, DEFAULT_SYNC_INTERVAL_SECONDS);
+ int fetchTimeoutMs =
+ master.getConfiguration().getInt(FETCH_TIMEOUT_MS, DEFAULT_FETCH_TIMEOUT_MS);
+ master.getChoreService().scheduleChore(new ScheduledChore(
+ getClass().getSimpleName() + "-Sync-Chore", this, syncIntervalSeconds, 0, TimeUnit.SECONDS) {
+
+ @Override
+ protected void chore() {
+ AsyncClusterConnection conn = master.getAsyncClusterConnection();
+ if (conn != null) {
+ addListener(conn.getAllMetaRegionLocations(fetchTimeoutMs), (locs, error) -> {
+ if (error != null) {
+ LOG.warn("Failed to fetch all meta region locations from active master", error);
+ return;
+ }
+ holder = new CacheHolder(locs);
+ });
+ }
+ }
+ });
+ }
+
+ RegionLocations locateMeta(byte[] row, RegionLocateType locateType) {
+ if (locateType == RegionLocateType.AFTER) {
+ // as we know the exact row after us, so we can just create the new row, and use the same
+ // algorithm to locate it.
+ row = Arrays.copyOf(row, row.length + 1);
+ locateType = RegionLocateType.CURRENT;
+ }
+ CacheHolder holder = this.holder;
+ if (holder == null) {
+ return null;
+ }
+ return locateType.equals(RegionLocateType.BEFORE) ?
+ locateRowBefore(holder.cache, TableName.META_TABLE_NAME, row, RegionInfo.DEFAULT_REPLICA_ID) :
+ locateRow(holder.cache, TableName.META_TABLE_NAME, row, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ List getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ CacheHolder holder = this.holder;
+ if (holder == null) {
+ return Collections.emptyList();
+ }
+ if (!excludeOfflinedSplitParents) {
+ // just return all the locations
+ return holder.all;
+ } else {
+ return holder.all.stream().filter(l -> !l.getRegion().isSplitParent())
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Stopping meta location cache: {}", why);
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
deleted file mode 100644
index fef1a84b3a4c..000000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ThreadFactory;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-
-/**
- * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
- * table znodes. Clients are expected to retry if the meta information is stale. This class is
- * thread-safe (a single instance of this class can be shared by multiple threads without race
- * conditions).
- * @deprecated Now we store meta location in the local store at master side so we should get the
- * meta location from active master instead of zk, keep it here only for compatibility.
- */
-@Deprecated
-@InterfaceAudience.Private
-public class MetaRegionLocationCache extends ZKListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class);
-
- /**
- * Maximum number of times we retry when ZK operation times out.
- */
- private static final int MAX_ZK_META_FETCH_RETRIES = 10;
- /**
- * Sleep interval ms between ZK operation retries.
- */
- private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000;
- private static final int SLEEP_INTERVAL_MS_MAX = 10000;
- private final RetryCounterFactory retryCounterFactory =
- new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES);
-
- /**
- * Cached meta region locations indexed by replica ID.
- * CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during
- * client requests. Even though CopyOnWriteArrayMap copies the data structure for every write,
- * that should be OK since the size of the list is often small and mutations are not too often
- * and we do not need to block client requests while mutations are in progress.
- */
- private final CopyOnWriteArrayMap cachedMetaLocations;
-
- private enum ZNodeOpType {
- INIT,
- CREATED,
- CHANGED,
- DELETED
- }
-
- public MetaRegionLocationCache(ZKWatcher zkWatcher) {
- super(zkWatcher);
- cachedMetaLocations = new CopyOnWriteArrayMap<>();
- watcher.registerListener(this);
- // Populate the initial snapshot of data from meta znodes.
- // This is needed because stand-by masters can potentially start after the initial znode
- // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers
- // are established. Subsequent updates are handled by the registered listener. Also, this runs
- // in a separate thread in the background to not block master init.
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
- RetryCounterFactory retryFactory = new RetryCounterFactory(
- Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
- threadFactory.newThread(
- ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
- }
-
- /**
- * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers
- * a watcher on base znode to check for any CREATE/DELETE events on the children.
- * @param retryCounter controls the number of retries and sleep between retries.
- */
- private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
- List znodes = null;
- while (retryCounter.shouldRetry()) {
- try {
- znodes = watcher.getMetaReplicaNodesAndWatchChildren();
- break;
- } catch (KeeperException ke) {
- LOG.debug("Error populating initial meta locations", ke);
- if (!retryCounter.shouldRetry()) {
- // Retries exhausted and watchers not set. This is not a desirable state since the cache
- // could remain stale forever. Propagate the exception.
- watcher.abort("Error populating meta locations", ke);
- return;
- }
- try {
- retryCounter.sleepUntilNextRetry();
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while loading meta locations from ZK", ie);
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- if (znodes == null || znodes.isEmpty()) {
- // No meta znodes exist at this point but we registered a watcher on the base znode to listen
- // for updates. They will be handled via nodeChildrenChanged().
- return;
- }
- if (znodes.size() == cachedMetaLocations.size()) {
- // No new meta znodes got added.
- return;
- }
- for (String znode: znodes) {
- String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
- updateMetaLocation(path, opType);
- }
- }
-
- /**
- * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for
- * future updates.
- * @param replicaId ReplicaID of the region.
- * @return HRegionLocation for the meta replica.
- * @throws KeeperException if there is any issue fetching/parsing the serialized data.
- */
- private HRegionLocation getMetaRegionLocation(int replicaId)
- throws KeeperException {
- RegionState metaRegionState;
- try {
- byte[] data = ZKUtil.getDataAndWatch(watcher,
- watcher.getZNodePaths().getZNodeForReplica(replicaId));
- metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
- return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName());
- }
-
- private void updateMetaLocation(String path, ZNodeOpType opType) {
- if (!isValidMetaZNode(path)) {
- return;
- }
- LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
- int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
- RetryCounter retryCounter = retryCounterFactory.create();
- HRegionLocation location = null;
- while (retryCounter.shouldRetry()) {
- try {
- if (opType == ZNodeOpType.DELETED) {
- if (!ZKUtil.watchAndCheckExists(watcher, path)) {
- // The path does not exist, we've set the watcher and we can break for now.
- break;
- }
- // If it is a transient error and the node appears right away, we fetch the
- // latest meta state.
- }
- location = getMetaRegionLocation(replicaId);
- break;
- } catch (KeeperException e) {
- LOG.debug("Error getting meta location for path {}", path, e);
- if (!retryCounter.shouldRetry()) {
- LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
- break;
- }
- try {
- retryCounter.sleepUntilNextRetry();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- if (location == null) {
- cachedMetaLocations.remove(replicaId);
- return;
- }
- cachedMetaLocations.put(replicaId, location);
- }
-
- /**
- * @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
- *
- */
- public Optional> getMetaRegionLocations() {
- ConcurrentNavigableMap snapshot =
- cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
- if (snapshot.isEmpty()) {
- // This could be possible if the master has not successfully initialized yet or meta region
- // is stuck in some weird state.
- return Optional.empty();
- }
- List result = new ArrayList<>();
- // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
- // ArrayValueCollection does not implement toArray().
- snapshot.values().forEach(location -> result.add(location));
- return Optional.of(result);
- }
-
- /**
- * Helper to check if the given 'path' corresponds to a meta znode. This listener is only
- * interested in changes to meta znodes.
- */
- private boolean isValidMetaZNode(String path) {
- return watcher.getZNodePaths().isAnyMetaReplicaZNode(path);
- }
-
- @Override
- public void nodeCreated(String path) {
- updateMetaLocation(path, ZNodeOpType.CREATED);
- }
-
- @Override
- public void nodeDeleted(String path) {
- updateMetaLocation(path, ZNodeOpType.DELETED);
- }
-
- @Override
- public void nodeDataChanged(String path) {
- updateMetaLocation(path, ZNodeOpType.CHANGED);
- }
-
- @Override
- public void nodeChildrenChanged(String path) {
- if (!path.equals(watcher.getZNodePaths().baseZNode)) {
- return;
- }
- loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7e0d3facec19..7eeb13dd401d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -289,7 +289,7 @@ public class HRegionServer extends Thread implements
/**
* The asynchronous cluster connection to be shared by services.
*/
- protected AsyncClusterConnection asyncClusterConnection;
+ protected volatile AsyncClusterConnection asyncClusterConnection;
/**
* Go here to get table descriptors.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 87557499f7ea..4385a5a731c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -157,4 +158,9 @@ public CompletableFuture cleanupBulkLoad(TableName tableName, String bulkT
public Connection toConnection() {
return null;
}
+
+ @Override
+ public CompletableFuture> getAllMetaRegionLocations(int callTimeoutMs) {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index ea1122c18f9a..72fc7058b35f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
/**
@@ -28,7 +31,7 @@
public class DummyConnectionRegistry implements ConnectionRegistry {
public static final String REGISTRY_IMPL_CONF_KEY =
- HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+ HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override
public CompletableFuture getClusterId() {
@@ -43,4 +46,15 @@ public CompletableFuture getActiveMaster() {
@Override
public void close() {
}
+
+ @Override
+ public CompletableFuture locateMeta(byte[] row, RegionLocateType type) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 555e7ac8592a..570f149763e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -33,7 +32,6 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.Waiter;
@@ -45,6 +43,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@Category({ MediumTests.class, ClientTests.class })
@@ -107,31 +106,6 @@ public void testMasterAddressParsing() throws IOException {
}
}
- @Test
- public void testRegistryRPCs() throws Exception {
- Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
- HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
- final int size =
- activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
- for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
- try (MasterRegistry registry = new MasterRegistry(conf)) {
- // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
- // because not all replicas had made it up before test started.
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
- assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
- assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
- List metaLocations =
- Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
- List actualMetaLocations =
- activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
- Collections.sort(metaLocations);
- Collections.sort(actualMetaLocations);
- assertEquals(actualMetaLocations, metaLocations);
- }
- }
- }
-
/**
* Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
* event of errors.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
deleted file mode 100644
index 71abaae17b8b..000000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MultithreadedTestUtil;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-
-@Category({ SmallTests.class, MasterTests.class })
-public class TestMetaRegionLocationCache {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static ConnectionRegistry REGISTRY;
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
- TEST_UTIL.startMiniCluster(3);
- REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
- TEST_UTIL.getAdmin().balancerSwitch(false, true);
- }
-
- @AfterClass
- public static void cleanUp() throws Exception {
- Closeables.close(REGISTRY, true);
- TEST_UTIL.shutdownMiniCluster();
- }
-
- private List getCurrentMetaLocations(ZKWatcher zk) throws Exception {
- List result = new ArrayList<>();
- for (String znode : zk.getMetaReplicaNodes()) {
- String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
- int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
- RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
- result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
- }
- return result;
- }
-
- // Verifies that the cached meta locations in the given master are in sync with what is in ZK.
- private void verifyCachedMetaLocations(HMaster master) throws Exception {
- // Wait until initial meta locations are loaded.
- int retries = 0;
- while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) {
- Thread.sleep(1000);
- if (++retries == 10) {
- break;
- }
- }
- List metaHRLs =
- master.getMetaRegionLocationCache().getMetaRegionLocations().get();
- assertFalse(metaHRLs.isEmpty());
- ZKWatcher zk = master.getZooKeeper();
- List metaZnodes = zk.getMetaReplicaNodes();
- assertEquals(metaZnodes.size(), metaHRLs.size());
- List actualHRLs = getCurrentMetaLocations(zk);
- Collections.sort(metaHRLs);
- Collections.sort(actualHRLs);
- assertEquals(actualHRLs, metaHRLs);
- }
-
- @Test
- public void testInitialMetaLocations() throws Exception {
- verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
- }
-
- @Test
- public void testStandByMetaLocations() throws Exception {
- HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
- verifyCachedMetaLocations(standBy);
- }
-
- /*
- * Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
- */
- @Test
- public void testMetaLocationsChange() throws Exception {
- List currentMetaLocs =
- getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
- // Move these replicas to random servers.
- for (HRegionLocation location : currentMetaLocs) {
- RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
- }
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
- for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
- .getMasterThreads()) {
- verifyCachedMetaLocations(masterThread.getMaster());
- }
- }
-
- /**
- * Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
- * znode for notifications.
- */
- @Test
- public void testMetaRegionLocationCache() throws Exception {
- final String parentZnodeName = "/randomznodename";
- Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
- ServerName sn = ServerName.valueOf("localhost", 1234, 5678);
- try (ZKWatcher zkWatcher = new ZKWatcher(conf, null, null, true)) {
- // A thread that repeatedly creates and drops an unrelated child znode. This is to simulate
- // some ZK activity in the background.
- MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
- ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
- @Override
- public void doAnAction() throws Exception {
- final String testZnode = parentZnodeName + "/child";
- ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
- ZKUtil.deleteNode(zkWatcher, testZnode);
- }
- });
- ctx.startThreads();
- try {
- MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher);
- // meta znodes do not exist at this point, cache should be empty.
- assertFalse(metaCache.getMetaRegionLocations().isPresent());
- // Set the meta locations for a random meta replicas, simulating an active hmaster meta
- // assignment.
- for (int i = 0; i < 3; i++) {
- // Updates the meta znodes.
- MetaTableLocator.setMetaLocation(zkWatcher, sn, i, RegionState.State.OPEN);
- }
- // Wait until the meta cache is populated.
- int iters = 0;
- while (iters++ < 10) {
- if (metaCache.getMetaRegionLocations().isPresent() &&
- metaCache.getMetaRegionLocations().get().size() == 3) {
- break;
- }
- Thread.sleep(1000);
- }
- List metaLocations = metaCache.getMetaRegionLocations().get();
- assertEquals(3, metaLocations.size());
- for (HRegionLocation location : metaLocations) {
- assertEquals(sn, location.getServerName());
- }
- } finally {
- // clean up.
- ctx.stop();
- ZKUtil.deleteChildrenRecursively(zkWatcher, parentZnodeName);
- }
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index af4533bdab39..2dd818a02e5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -118,7 +118,7 @@ public void testIndependentZKConnections() throws IOException {
}
@Test
- public void testNoMetaAvailable() throws InterruptedException {
+ public void testNoMetaAvailable() throws InterruptedException, IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set("zookeeper.znode.metaserver", "whatever");
try (ZKConnectionRegistry registry = new ZKConnectionRegistry(conf)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
index 428aee2a142c..233459733ae0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
@@ -21,8 +21,10 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
+
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,6 +32,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -40,26 +43,35 @@
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionLocateType;
-@Category({MediumTests.class, MasterTests.class})
+@Category({ MediumTests.class, MasterTests.class })
public class TestClientMetaServiceRPCs {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestClientMetaServiceRPCs.class);
+ HBaseClassTestRule.forClass(TestClientMetaServiceRPCs.class);
// Total number of masters (active + stand by) for the purpose of this test.
private static final int MASTER_COUNT = 3;
@@ -75,10 +87,21 @@ public static void setUp() throws Exception {
builder.numMasters(MASTER_COUNT).numRegionServers(3);
TEST_UTIL.startMiniCluster(builder.build());
conf = TEST_UTIL.getConfiguration();
- rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos(
- conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
+ rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS
+ .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
rpcClient = RpcClientFactory.createClient(conf,
- TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId());
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId());
+ // make sure all masters have cluster connection set up
+ TEST_UTIL.waitFor(30000, () -> {
+ for (MasterThread mt : TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ if (mt.getMaster().getAsyncClusterConnection() == null) {
+ return false;
+ }
+ }
+ return true;
+ });
+ Thread.sleep(2 * conf.getInt(MetaLocationCache.SYNC_INTERVAL_SECONDS,
+ MetaLocationCache.DEFAULT_SYNC_INTERVAL_SECONDS) * 1000);
}
@AfterClass
@@ -90,9 +113,9 @@ public static void tearDown() throws Exception {
}
private static ClientMetaService.BlockingInterface getMasterStub(ServerName server)
- throws IOException {
- return ClientMetaService.newBlockingStub(
- rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout));
+ throws IOException {
+ return ClientMetaService
+ .newBlockingStub(rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout));
}
private static HBaseRpcController getRpcController() {
@@ -102,16 +125,17 @@ private static HBaseRpcController getRpcController() {
/**
* Verifies the cluster ID from all running masters.
*/
- @Test public void TestClusterID() throws Exception {
+ @Test
+ public void TestClusterID() throws Exception {
HBaseRpcController rpcController = getRpcController();
String clusterID = TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId();
int rpcCount = 0;
- for (JVMClusterUtil.MasterThread masterThread:
- TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
- getMasterStub(masterThread.getMaster().getServerName());
+ getMasterStub(masterThread.getMaster().getServerName());
GetClusterIdResponse resp =
- stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance());
+ stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance());
assertEquals(clusterID, resp.getClusterId());
rpcCount++;
}
@@ -121,40 +145,93 @@ private static HBaseRpcController getRpcController() {
/**
* Verifies the active master ServerName as seen by all masters.
*/
- @Test public void TestActiveMaster() throws Exception {
+ @Test
+ public void TestActiveMaster() throws Exception {
HBaseRpcController rpcController = getRpcController();
ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName();
int rpcCount = 0;
- for (JVMClusterUtil.MasterThread masterThread:
- TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
- getMasterStub(masterThread.getMaster().getServerName());
+ getMasterStub(masterThread.getMaster().getServerName());
GetActiveMasterResponse resp =
- stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
+ stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName()));
rpcCount++;
}
assertEquals(MASTER_COUNT, rpcCount);
}
+ private List getMetaLocations() throws IOException {
+ List metaLocations = new ArrayList<>();
+ for (RegionLocations locs : TEST_UTIL.getMiniHBaseCluster().getMaster()
+ .getAllMetaRegionLocations(true)) {
+ metaLocations.addAll(Arrays.asList(locs.getRegionLocations()));
+ }
+ Collections.sort(metaLocations);
+ return metaLocations;
+ }
+
/**
* Verifies that the meta region locations RPC returns consistent results across all masters.
*/
- @Test public void TestMetaLocations() throws Exception {
+ @Test
+ public void TestMetaLocations() throws Exception {
HBaseRpcController rpcController = getRpcController();
- List metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
- .getMetaRegionLocationCache().getMetaRegionLocations().get();
- Collections.sort(metaLocations);
+ List metaLocations = getMetaLocations();
+ int rpcCount = 0;
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
+ ClientMetaService.BlockingInterface stub =
+ getMasterStub(masterThread.getMaster().getServerName());
+ GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations(rpcController,
+ GetMetaRegionLocationsRequest.getDefaultInstance());
+ List result = new ArrayList<>();
+ resp.getMetaLocationsList()
+ .forEach(location -> result.add(ProtobufUtil.toRegionLocation(location)));
+ Collections.sort(result);
+ assertEquals(metaLocations, result);
+ rpcCount++;
+ }
+ assertEquals(MASTER_COUNT, rpcCount);
+ }
+
+ @Test
+ public void testLocateMeta() throws Exception {
+ HBaseRpcController rpcController = getRpcController();
+ List metaLocations = getMetaLocations();
+ int rpcCount = 0;
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
+ ClientMetaService.BlockingInterface stub =
+ getMasterStub(masterThread.getMaster().getServerName());
+ LocateMetaRegionResponse resp = stub.locateMetaRegion(rpcController,
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.EMPTY)
+ .setLocateType(RegionLocateType.REGION_LOCATE_TYPE_CURRENT).build());
+ List result = new ArrayList<>();
+ resp.getMetaLocationsList()
+ .forEach(location -> result.add(ProtobufUtil.toRegionLocation(location)));
+ Collections.sort(result);
+ assertEquals(metaLocations, result);
+ rpcCount++;
+ }
+ assertEquals(MASTER_COUNT, rpcCount);
+ }
+
+ @Test
+ public void testGetAllMetaLocations() throws Exception {
+ HBaseRpcController rpcController = getRpcController();
+ List metaLocations = getMetaLocations();
int rpcCount = 0;
- for (JVMClusterUtil.MasterThread masterThread:
- TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
- getMasterStub(masterThread.getMaster().getServerName());
- GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations(
- rpcController, GetMetaRegionLocationsRequest.getDefaultInstance());
+ getMasterStub(masterThread.getMaster().getServerName());
+ GetAllMetaRegionLocationsResponse resp = stub.getAllMetaRegionLocations(rpcController,
+ GetAllMetaRegionLocationsRequest.newBuilder().setExcludeOfflinedSplitParents(true).build());
List result = new ArrayList<>();
- resp.getMetaLocationsList().forEach(
- location -> result.add(ProtobufUtil.toRegionLocation(location)));
+ resp.getMetaLocationsList()
+ .forEach(location -> result.add(ProtobufUtil.toRegionLocation(location)));
Collections.sort(result);
assertEquals(metaLocations, result);
rpcCount++;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaLocationCache.java
new file mode 100644
index 000000000000..306767ed1ee3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaLocationCache.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestMetaLocationCache {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMetaLocationCache.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private static ChoreService CHORE_SERVICE;
+
+ private static byte[] SPLIT = Bytes.toBytes("a");
+
+ private MasterServices master;
+
+ private MetaLocationCache cache;
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ CONF.setInt(MetaLocationCache.SYNC_INTERVAL_SECONDS, 1);
+ CHORE_SERVICE = new ChoreService("TestMetaLocationCache");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ CHORE_SERVICE.shutdown();
+ }
+
+ @Before
+ public void setUp() {
+ master = mock(MasterServices.class);
+ when(master.getConfiguration()).thenReturn(CONF);
+ when(master.getChoreService()).thenReturn(CHORE_SERVICE);
+ cache = new MetaLocationCache(master);
+ }
+
+ @After
+ public void tearDown() {
+ if (cache != null) {
+ cache.stop("test end");
+ }
+ }
+
+ @Test
+ public void testError() throws InterruptedException {
+ AsyncClusterConnection conn = mock(AsyncClusterConnection.class);
+ when(conn.getAllMetaRegionLocations(anyInt()))
+ .thenReturn(FutureUtils.failedFuture(new RuntimeException("inject error")));
+ when(master.getAsyncClusterConnection()).thenReturn(conn);
+ Thread.sleep(2000);
+ assertNull(cache.locateMeta(HConstants.EMPTY_BYTE_ARRAY, RegionLocateType.CURRENT));
+ assertTrue(cache.getAllMetaRegionLocations(true).isEmpty());
+
+ HRegionLocation loc =
+ new HRegionLocation(RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build(),
+ ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+ when(conn.getAllMetaRegionLocations(anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(Arrays.asList(loc)));
+ Thread.sleep(2000);
+ List list = cache.getAllMetaRegionLocations(false);
+ assertEquals(1, list.size());
+ assertEquals(loc, list.get(0));
+ }
+
+ private void prepareData() throws InterruptedException {
+ AsyncClusterConnection conn = mock(AsyncClusterConnection.class);
+ RegionInfo parent = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).setSplit(true)
+ .setOffline(true).build();
+ RegionInfo daughter1 =
+ RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).setEndKey(SPLIT).build();
+ RegionInfo daughter2 =
+ RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).setStartKey(SPLIT).build();
+ HRegionLocation parentLoc = new HRegionLocation(parent,
+ ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis()));
+ HRegionLocation daughter1Loc = new HRegionLocation(daughter1,
+ ServerName.valueOf("127.0.0.2", 12345, System.currentTimeMillis()));
+ HRegionLocation daughter2Loc = new HRegionLocation(daughter2,
+ ServerName.valueOf("127.0.0.3", 12345, System.currentTimeMillis()));
+ when(conn.getAllMetaRegionLocations(anyInt())).thenReturn(
+ CompletableFuture.completedFuture(Arrays.asList(parentLoc, daughter1Loc, daughter2Loc)));
+ when(master.getAsyncClusterConnection()).thenReturn(conn);
+ Thread.sleep(2000);
+ }
+
+ @Test
+ public void testLocateMeta() throws InterruptedException {
+ prepareData();
+ RegionLocations locs = cache.locateMeta(SPLIT, RegionLocateType.BEFORE);
+ assertEquals(1, locs.size());
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ assertArrayEquals(SPLIT, loc.getRegion().getEndKey());
+
+ locs = cache.locateMeta(SPLIT, RegionLocateType.CURRENT);
+ assertEquals(1, locs.size());
+ loc = locs.getDefaultRegionLocation();
+ assertArrayEquals(SPLIT, loc.getRegion().getStartKey());
+
+ locs = cache.locateMeta(SPLIT, RegionLocateType.AFTER);
+ assertEquals(1, locs.size());
+ loc = locs.getDefaultRegionLocation();
+ assertArrayEquals(SPLIT, loc.getRegion().getStartKey());
+ }
+
+ @Test
+ public void testGetAllMetaRegionLocations() throws InterruptedException {
+ prepareData();
+ List locs = cache.getAllMetaRegionLocations(false);
+ assertEquals(3, locs.size());
+ HRegionLocation loc = locs.get(0);
+ assertTrue(loc.getRegion().isSplitParent());
+ loc = locs.get(1);
+ assertArrayEquals(SPLIT, loc.getRegion().getEndKey());
+ loc = locs.get(2);
+ assertArrayEquals(SPLIT, loc.getRegion().getStartKey());
+
+ locs = cache.getAllMetaRegionLocations(true);
+ assertEquals(2, locs.size());
+ loc = locs.get(0);
+ assertArrayEquals(SPLIT, loc.getRegion().getEndKey());
+ loc = locs.get(1);
+ assertArrayEquals(SPLIT, loc.getRegion().getStartKey());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java
index fc82eb1e8cfc..dac471e50aa9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java
@@ -122,7 +122,6 @@ protected AssignmentManager createAssignmentManager(MasterServices master,
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
- setupClusterConnection();
return new ServerManagerForTest(master);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java
index acb45f7a9ed4..41e884dc983f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java
@@ -129,7 +129,6 @@ protected AssignmentManager createAssignmentManager(MasterServices master,
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
- setupClusterConnection();
return new ServerManagerForTest(master);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java
index f5bc86a82b8b..ffca796456c3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java
@@ -142,7 +142,6 @@ protected AssignmentManager createAssignmentManager(MasterServices master,
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
- setupClusterConnection();
return new ServerManagerForTest(master);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
index 59191a240036..fb0e460df544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
@@ -210,7 +210,6 @@ protected AssignmentManager createAssignmentManager(MasterServices master,
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
- setupClusterConnection();
return new SMForTest(master);
}
}