Skip to content

Commit

Permalink
HBASE-21580 Support getting Hbck instance from AsyncConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jan 11, 2019
1 parent 5d32e80 commit 7bebdff
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -181,4 +185,33 @@ default AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorSer
* @param pool the thread pool to use for executing callback
*/
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);

/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool.
* @return an Hbck instance for active master. Active master is fetched from the zookeeper.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
CompletableFuture<Hbck> getHbck();

/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool. This may only be used to by pass getting registered
* master from ZK. In situations where ZK is not available or active master is not registered with
* ZK and user can get master address by other means, master can be explicitly specified.
* @param masterServer explicit {@link ServerName} for master server
* @return an Hbck instance for a specified master server
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
Hbck getHbck(ServerName masterServer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;

import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,28 +29,35 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;

/**
* The implementation of AsyncConnection.
Expand Down Expand Up @@ -325,4 +326,30 @@ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}

@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
});
return future;
}

@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,14 @@ public Hbck getHbck(ServerName masterServer) throws IOException {
throw new RegionServerStoppedException(masterServer + " is dead.");
}
String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
masterServer, this.hostnamesCanChange);

return new HBaseHbck(this,
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
return MasterProtos.HbckService.newBlockingStub(channel);
}));
masterServer, this.hostnamesCanChange);

return new HBaseHbck(
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
return MasterProtos.HbckService.newBlockingStub(channel);
}), rpcControllerFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck {

private RpcControllerFactory rpcControllerFactory;

HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException {
HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
this.hbck = hbck;
this.rpcControllerFactory = connection.getRpcControllerFactory();
this.rpcControllerFactory = rpcControllerFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
Expand All @@ -49,16 +47,23 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
* Class to test HBaseHbck.
* Spins up the minicluster once at test start and then takes it down afterward.
* Add any testing of HBaseHbck functionality here.
*/
@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class})
public class TestHbck {
@ClassRule
Expand All @@ -71,19 +76,39 @@ public class TestHbck {
@Rule
public TestName name = new TestName();

@Parameter
public boolean async;

private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());

private static ProcedureExecutor<MasterProcedureEnv> procExec;

private static AsyncConnection ASYNC_CONN;

@Parameters(name = "{index}: async={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}

private Hbck getHbck() throws Exception {
if (async) {
return ASYNC_CONN.getHbck().get();
} else {
return TEST_UTIL.getHbck();
}
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(ASYNC_CONN, true);
TEST_UTIL.shutdownMiniCluster();
}

Expand Down Expand Up @@ -120,16 +145,15 @@ public void testBypassProcedure() throws Exception {

//bypass the procedure
List<Long> pids = Arrays.<Long>asList(procId);
List<Boolean> results =
TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
List<Boolean> results = getHbck().bypassProcedure(pids, 30000, false, false);
assertTrue("Failed to by pass procedure!", results.get(0));
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}

@Test
public void testSetTableStateInMeta() throws IOException {
Hbck hbck = TEST_UTIL.getHbck();
public void testSetTableStateInMeta() throws Exception {
Hbck hbck = getHbck();
// set table state to DISABLED
hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED));
// Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case
Expand All @@ -141,8 +165,8 @@ public void testSetTableStateInMeta() throws IOException {
}

@Test
public void testAssigns() throws IOException {
Hbck hbck = TEST_UTIL.getHbck();
public void testAssigns() throws Exception {
Hbck hbck = getHbck();
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
for (RegionInfo ri: regions) {
Expand Down Expand Up @@ -183,7 +207,7 @@ public void testScheduleSCP() throws Exception {
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
true);
ServerName serverName = testRs.getServerName();
Hbck hbck = TEST_UTIL.getHbck();
Hbck hbck = getHbck();
List<Long> pids =
hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
assertTrue(pids.get(0) > 0);
Expand Down

0 comments on commit 7bebdff

Please sign in to comment.