Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22662 Move RSGroupInfoManager to hbase-server #368

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions hbase-rsgroup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
throws HBaseIOException {
if (!isOnline()) {
throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
" is not online, unable to perform balance");
throw new ConstraintException(
RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance");
}

// Calculate correct assignments and a list of RegionPlan for mis-placed regions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public boolean evaluate() throws Exception {
}
});
// Move table to group and wait.
groupAdmin.moveTables(Sets.newHashSet(RSGroupInfoManager.RSGROUP_TABLE_NAME), newGroup);
groupAdmin.moveTables(Sets.newHashSet(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME), newGroup);
LOG.info("Waiting for move table...");
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
Expand Down Expand Up @@ -169,7 +169,7 @@ public boolean evaluate() throws Exception {
// Make sure balancer is in offline mode, since this is what we're testing.
assertFalse(groupMgr.isOnline());
// Verify the group affiliation that's loaded from ZK instead of tables.
assertEquals(newGroup, groupMgr.getRSGroupOfTable(RSGroupInfoManager.RSGROUP_TABLE_NAME));
assertEquals(newGroup, groupMgr.getRSGroupOfTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME));
assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable));
// Kill final regionserver to see the failover happens for all tables except GROUP table since
// it's group does not have any online RS.
Expand All @@ -182,7 +182,7 @@ public boolean evaluate() throws Exception {
return failoverRS.getRegions(failoverTable).size() >= 1;
}
});
Assert.assertEquals(0, failoverRS.getRegions(RSGroupInfoManager.RSGROUP_TABLE_NAME).size());
Assert.assertEquals(0, failoverRS.getRegions(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME).size());

// Need this for minicluster to shutdown cleanly.
master.stopMaster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf
throws IOException {
wrapped = RSGroupAdmin;
table = ConnectionFactory.createConnection(conf)
.getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME);
.getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME);
zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null);
}

Expand Down Expand Up @@ -126,8 +126,8 @@ public void verify() throws IOException {
RSGroupProtos.RSGroupInfo proto =
RSGroupProtos.RSGroupInfo.parseFrom(
result.getValue(
RSGroupInfoManager.META_FAMILY_BYTES,
RSGroupInfoManager.META_QUALIFIER_BYTES));
RSGroupInfoManagerImpl.META_FAMILY_BYTES,
RSGroupInfoManagerImpl.META_QUALIFIER_BYTES));
groupMap.put(proto.getName(), ProtobufUtil.toGroupInfo(proto));
}
Assert.assertEquals(Sets.newHashSet(groupMap.values()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.rsgroup;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Interface used to manage RSGroupInfo storage. An implementation
* has the option to support offline mode.
* See {@link RSGroupBasedLoadBalancer}
* Interface used to manage RSGroupInfo storage. An implementation has the option to support offline
* mode. See {@code RSGroupBasedLoadBalancer}.
*/
@InterfaceAudience.Private
public interface RSGroupInfoManager {

String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;

//Assigned before user tables
TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
String rsGroupZNode = "rsgroup";
byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
byte[] ROW_KEY = {0};

void start();

/**
Expand Down Expand Up @@ -86,7 +70,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)

/**
* Set the group membership of a set of tables
*
* @param tableNames set of tables to move
* @param groupName name of group of tables to move to
*/
Expand All @@ -104,7 +87,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)

/**
* Whether the manager is able to fully return group metadata
*
* @return whether the manager is in online mode
*/
boolean isOnline();
Expand All @@ -116,8 +98,8 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
* @param srcGroup groupName being moved from
* @param dstGroup groupName being moved to
*/
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
String srcGroup, String dstGroup) throws IOException;
void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
String dstGroup) throws IOException;

/**
* Remove decommissioned servers from rsgroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.rsgroup;

import com.google.protobuf.ServiceException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -36,10 +34,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -48,14 +48,11 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
Expand All @@ -66,10 +63,14 @@
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand All @@ -79,6 +80,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
Expand All @@ -91,13 +93,13 @@
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
* too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
* zk) on each modification.
* <p>
* <p/>
* Mutations on state are synchronized but reads can continue without having to wait on an instance
* monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
* state are read-only, just-in-case (see flushConfig).
* <p>
* <p/>
* Reads must not block else there is a danger we'll deadlock.
* <p>
* <p/>
* Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
* on the results of the query modifying cache in zookeeper without another thread making
* intermediate modifications. These clients synchronize on the 'this' instance so no other has
Expand All @@ -107,6 +109,24 @@
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);

private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;

// Assigned before user tables
@VisibleForTesting
static final TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

private static final String RS_GROUP_ZNODE = "rsgroup";

@VisibleForTesting
static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");

@VisibleForTesting
static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");

private static final byte[] ROW_KEY = { 0 };

/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
private static final TableDescriptor RSGROUP_TABLE_DESC;
static {
Expand All @@ -129,7 +149,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private volatile Map<TableName, String> tableMap = Collections.emptyMap();

private final MasterServices masterServices;
private final Connection conn;
private final AsyncClusterConnection conn;
private final ZKWatcher watcher;
private final RSGroupStartupWorker rsGroupStartupWorker;
// contains list of groups that were last flushed to persistent store
Expand All @@ -141,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
this.watcher = masterServices.getZooKeeper();
this.conn = masterServices.getConnection();
this.conn = masterServices.getAsyncClusterConnection();
this.rsGroupStartupWorker = new RSGroupStartupWorker();
}

Expand Down Expand Up @@ -357,25 +377,25 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
}
}

List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
private List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan())) {
AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
try (ResultScanner scanner = table.getScanner(META_FAMILY_BYTES, META_QUALIFIER_BYTES)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
rsGroupInfoList.add(ProtobufUtil.toGroupInfo(proto));
}
}
return rsGroupInfoList;
}

List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
// Overwrite any info stored by table, this takes precedence
try {
Expand Down Expand Up @@ -527,7 +547,8 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
resetRSGroupAndTableMaps(newGroupMap, newTableMap);

try {
String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
String groupBasePath =
ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);

List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
Expand Down Expand Up @@ -790,11 +811,8 @@ private boolean waitForGroupTableOnline() {
createRSGroupTable();
}
// try reading from the table
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
table.get(new Get(ROW_KEY));
}
LOG.info(
"RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
FutureUtils.get(conn.getTable(RSGROUP_TABLE_NAME).get(new Get(ROW_KEY)));
LOG.info("RSGroup table={} is online, refreshing cached information", RSGROUP_TABLE_NAME);
RSGroupInfoManagerImpl.this.refresh(true);
online = true;
// flush any inconsistencies between ZK and HTable
Expand Down Expand Up @@ -836,8 +854,8 @@ private void createRSGroupTable() throws IOException {
} else {
Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
if (result != null && result.isFailed()) {
throw new IOException(
"Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
throw new IOException("Failed to create group table. " +
MasterProcedureUtil.unwrapRemoteIOException(result));
}
}
}
Expand All @@ -852,33 +870,24 @@ private static boolean isMasterRunning(MasterServices masterServices) {
}

private void multiMutate(List<Mutation> mutations) throws IOException {
try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
MultiRowMutationProtos.MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
mutation));
} else if (mutation instanceof Delete) {
mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
mutation));
} else {
throw new DoNotRetryIOException(
MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
builder
.addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
} else if (mutation instanceof Delete) {
builder.addMutationRequest(
ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
} else {
throw new DoNotRetryIOException(
"multiMutate doesn't support " + mutation.getClass().getName());
}
}

MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
try {
service.mutateRows(null, mmrBuilder.build());
} catch (ServiceException ex) {
ProtobufUtil.toIOException(ex);
}
}
MutateRowsRequest request = builder.build();
AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
MultiRowMutationService::newStub,
(stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
}

private void checkGroupName(String groupName) throws ConstraintException {
Expand Down