Skip to content

Commit

Permalink
HBASE-24459 Move the locateMeta logic from AsyncMetaRegionTableLocato…
Browse files Browse the repository at this point in the history
…r to ConnectionRegistry
  • Loading branch information
Apache9 committed Jul 20, 2020
1 parent ef5b91b commit 488a9b7
Show file tree
Hide file tree
Showing 25 changed files with 764 additions and 807 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
Expand Down Expand Up @@ -240,10 +239,6 @@ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOExce
() -> createRegionServerStub(serverName));
}

private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
Expand All @@ -255,26 +250,8 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
}

CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createMasterStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "master stub");
return ConnectionUtils.getMasterStub(registry, masterStub, masterStubMakeFuture, rpcClient,
user, rpcTimeout, TimeUnit.MILLISECONDS, MasterService::newStub, "MasterService");
}

String getClusterId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,134 +19,38 @@

import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;

/**
* The class for locating region for meta table.
*/
@InterfaceAudience.Private
class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {

private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);

private final AtomicReference<Interface> stub = new AtomicReference<>();

private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
new AtomicReference<>();

AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
super(conn, tableName, maxConcurrent);
}

private Interface createStub(ServerName serverName) throws IOException {
return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
(int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
}

CompletableFuture<Interface> getStub() {
return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
CompletableFuture<Interface> future = new CompletableFuture<>();
addListener(conn.registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "ClientLocateMetaStub");
}

private void tryClearMasterStubCache(IOException error, Interface currentStub) {
if (ClientExceptionsUtil.isConnectionException(error) ||
error instanceof ServerNotRunningYetException) {
stub.compareAndSet(currentStub, null);
}
}

@Override
protected void locate(LocateRequest req) {
addListener(getStub(), (stub, error) -> {
addListener(conn.registry.locateMeta(req.row, req.locateType), (locs, error) -> {
if (error != null) {
onLocateComplete(req, null, error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.locateMetaRegion(controller,
LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
.setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
onLocateComplete(req, null, ex);
return;
}
RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
}

@Override
CompletableFuture<List<HRegionLocation>>
getAllRegionLocations(boolean excludeOfflinedSplitParents) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getStub(), (stub, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
.setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
future.completeExceptionally(ex);
return;
}
List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
future.complete(locs);
});
});
return future;
return conn.registry.getAllMetaRegionLocations(excludeOfflinedSplitParents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only.
* hold cluster information such as this cluster's id, location of hbase:meta, etc.. Internal use
* only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {

/**
* Get location of meta region for the given {@code row}.
*/
CompletableFuture<RegionLocations> locateMeta(byte[] row, RegionLocateType locateType);

/**
* Get all meta region locations, including the location of secondary regions.
* @param excludeOfflinedSplitParents whether to include split parent.
*/
CompletableFuture<List<HRegionLocation>>
getAllMetaRegionLocations(boolean excludeOfflinedSplitParents);

/**
* Should only be called once.
* <p>
* <p/>
* The upper layer should store this value somewhere as it will not be change any more.
*/
CompletableFuture<String> getClusterId();
Expand Down
Loading

0 comments on commit 488a9b7

Please sign in to comment.