Skip to content

Commit

Permalink
HBASE-24459 Move the locateMeta logic from AsyncMetaRegionTableLocato…
Browse files Browse the repository at this point in the history
…r to ConnectionRegistry (#2095)

Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Bharath Vissapragada <[email protected]>
  • Loading branch information
Apache9 committed Sep 10, 2020
1 parent 6610386 commit f74450e
Show file tree
Hide file tree
Showing 27 changed files with 948 additions and 808 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
Expand Down Expand Up @@ -257,10 +256,6 @@ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOExce
() -> createRegionServerStub(serverName));
}

private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
Expand All @@ -272,26 +267,8 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
}

CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createMasterStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "master stub");
return ConnectionUtils.getMasterStub(registry, masterStub, masterStubMakeFuture, rpcClient,
user, rpcTimeout, TimeUnit.MILLISECONDS, MasterService::newStub, "MasterService");
}

String getClusterId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,137 +19,40 @@

import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;

/**
* The class for locating region for meta table.
*/
@InterfaceAudience.Private
class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {

private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);

private final AtomicReference<Interface> stub = new AtomicReference<>();

private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
new AtomicReference<>();

AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
// for meta region we should use MetaCellComparator to compare the row keys
super(conn, tableName, maxConcurrent, (r1, r2) -> MetaCellComparator
.compareRows(r1, 0, r1.length, r2, 0, r2.length));
}

private Interface createStub(ServerName serverName) throws IOException {
return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
(int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
}

CompletableFuture<Interface> getStub() {
return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
CompletableFuture<Interface> future = new CompletableFuture<>();
addListener(conn.registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "ClientLocateMetaStub");
}

private void tryClearMasterStubCache(IOException error, Interface currentStub) {
if (ClientExceptionsUtil.isConnectionException(error) ||
error instanceof ServerNotRunningYetException) {
stub.compareAndSet(currentStub, null);
}
super(conn, tableName, maxConcurrent, MetaCellComparator.ROW_COMPARATOR);
}

@Override
protected void locate(LocateRequest req) {
addListener(getStub(), (stub, error) -> {
addListener(conn.registry.locateMeta(req.row, req.locateType), (locs, error) -> {
if (error != null) {
onLocateComplete(req, null, error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.locateMetaRegion(controller,
LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
.setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
onLocateComplete(req, null, ex);
return;
}
RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
}

@Override
CompletableFuture<List<HRegionLocation>>
getAllRegionLocations(boolean excludeOfflinedSplitParents) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getStub(), (stub, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
.setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
future.completeExceptionally(ex);
return;
}
List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
future.complete(locs);
});
});
return future;
return conn.registry.getAllMetaRegionLocations(excludeOfflinedSplitParents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only.
* hold cluster information such as this cluster's id, location of hbase:meta, etc.. Internal use
* only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {

/**
* Get location of meta region for the given {@code row}.
*/
CompletableFuture<RegionLocations> locateMeta(byte[] row, RegionLocateType locateType);

/**
* Get all meta region locations, including the location of secondary regions.
* @param excludeOfflinedSplitParents whether to include split parent.
*/
CompletableFuture<List<HRegionLocation>>
getAllMetaRegionLocations(boolean excludeOfflinedSplitParents);

/**
* Should only be called once.
* <p>
* <p/>
* The upper layer should store this value somewhere as it will not be change any more.
*/
CompletableFuture<String> getClusterId();
Expand Down
Loading

0 comments on commit f74450e

Please sign in to comment.