Skip to content

Commit

Permalink
HBASE-25032 Do not assign regions to region server which has not call…
Browse files Browse the repository at this point in the history
…ed regionServerReport yet (#3268)

Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: Michael Stack <[email protected]>
  • Loading branch information
Apache9 committed May 18, 2021
1 parent 1a8d3c3 commit 8110b18
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap(){
Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
for(ReplicationLoadSource loadSource : sources){
sourcesMap.computeIfAbsent(loadSource.getPeerID(),
peerId -> new ArrayList()).add(loadSource);
peerId -> new ArrayList<>()).add(loadSource);
}
return sourcesMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {

private MasterServices masterServices;
private volatile RSGroupInfoManager rsGroupInfoManager;
private LoadBalancer internalBalancer;
private volatile LoadBalancer internalBalancer;

/**
* Set this key to {@code true} to allow region fallback.
Expand All @@ -84,7 +84,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
*/
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";

private boolean fallbackEnabled = false;
private volatile boolean fallbackEnabled = false;

/**
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.MetaRWQueueRpcExecutor;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -68,6 +70,11 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {

@BeforeClass
public static void setUp() throws Exception {
// avoid all the handlers blocked when meta is offline, and regionServerReport can not be
// processed which causes dead lock.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.getConfiguration()
.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
setUpTestBeforeClass();
}

Expand Down Expand Up @@ -261,8 +268,7 @@ public void testLowerMetaGroupVersion() throws Exception{
assertTrue(majorVersion >= 1);
String lowerVersion = String.valueOf(majorVersion - 1) + originVersion.split("\\.")[1];
setFinalStatic(Version.class.getField("version"), lowerVersion);
TEST_UTIL.getMiniHBaseCluster().startRegionServer(address.getHostname(),
address.getPort());
TEST_UTIL.getMiniHBaseCluster().startRegionServer(address.getHostName(), address.getPort());
assertEquals(NUM_SLAVES_BASE,
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
assertTrue(VersionInfo.compareVersion(originVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -951,11 +951,19 @@ public void stop() {

/**
* Creates a list of possible destinations for a region. It contains the online servers, but not
* the draining or dying servers.
* @param serversToExclude can be null if there is no server to exclude
* the draining or dying servers.
* @param serversToExclude can be null if there is no server to exclude
*/
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
final List<ServerName> destServers = getOnlineServersList();
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
Set<ServerName> destServers = new HashSet<>();
onlineServers.forEach((sn, sm) -> {
if (sm.getLastReportTimestamp() > 0) {
// This means we have already called regionServerReport at leaset once, then let's include
// this server for region assignment. This is an optimization to avoid assigning regions to
// an uninitialized server. See HBASE-25032 for more details.
destServers.add(sn);
}
});

if (serversToExclude != null) {
destServers.removeAll(serversToExclude);
Expand All @@ -965,7 +973,7 @@ public List<ServerName> createDestinationServersList(final List<ServerName> serv
final List<ServerName> drainingServersCopy = getDrainingServersList();
destServers.removeAll(drainingServersCopy);

return destServers;
return new ArrayList<>(destServers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
Expand All @@ -37,9 +42,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;

@Category({MasterTests.class, MediumTests.class})
public class TestMasterMetrics {
Expand All @@ -66,6 +76,32 @@ protected void tryRegionServerReport(
// do nothing
}

@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MasterRpcServices(this) {

@Override
public RegionServerStartupResponse regionServerStartup(RpcController controller,
RegionServerStartupRequest request) throws ServiceException {
RegionServerStartupResponse resp = super.regionServerStartup(controller, request);
ServerManager serverManager = getServerManager();
// to let the region server actual online otherwise we can not assign meta region
new HashMap<>(serverManager.getOnlineServers()).forEach((sn, sm) -> {
if (sm.getLastReportTimestamp() <= 0) {
try {
serverManager.regionServerReport(sn,
ServerMetricsBuilder.newBuilder(sn).setVersionNumber(sm.getVersionNumber())
.setVersion(sm.getVersion()).setLastReportTimestamp(System.currentTimeMillis())
.build());
} catch (YouAreDeadException e) {
throw new UncheckedIOException(e);
}
}
});
return resp;
}
};
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public void start(final int numServes, final RSProcedureDispatcher remoteDispatc
this.assignmentManager.start();
for (int i = 0; i < numServes; ++i) {
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
.setLastReportTimestamp(System.currentTimeMillis()).build());
}
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
}
Expand All @@ -182,7 +183,8 @@ public void restartRegionServer(ServerName serverName) throws IOException {
return;
}
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
.setLastReportTimestamp(System.currentTimeMillis()).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;

/**
* UT for HBASE-25032.
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestAssignRegionToUninitializedRegionServer {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAssignRegionToUninitializedRegionServer.class);

private static CountDownLatch ARRIVE;

private static CountDownLatch RESUME;

private static AtomicBoolean ASSIGN_CALLED = new AtomicBoolean(false);

public static final class RSRpcServicesForTest extends RSRpcServices {

public RSRpcServicesForTest(HRegionServer rs) throws IOException {
super(rs);
}

@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
if (request.getOpenRegionCount() > 0) {
ASSIGN_CALLED.set(true);
}
return super.executeProcedures(controller, request);
}
}

public static final class RegionServerForTest extends HRegionServer {

public RegionServerForTest(Configuration conf) throws IOException {
super(conf);
}

@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
if (ARRIVE != null) {
ARRIVE.countDown();
ARRIVE = null;
try {
RESUME.await();
} catch (InterruptedException e) {
}
}
super.tryRegionServerReport(reportStartTime, reportEndTime);
}

@Override
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServicesForTest(this);
}
}

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static TableName NAME = TableName.valueOf("test");

private static byte[] FAMILY = Bytes.toBytes("family");

@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
UTIL.createTable(NAME, FAMILY);
UTIL.waitTableAvailable(NAME);
}

@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}

@Test
public void testMove() throws Exception {
UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
RegionServerForTest.class, HRegionServer.class);
CountDownLatch arrive = new CountDownLatch(1);
ARRIVE = arrive;
RESUME = new CountDownLatch(1);
// restart a new region server, and wait until it finish initialization and want to call
// regionServerReport, so it will load the peer state to peer cache.
Future<HRegionServer> regionServerFuture = ForkJoinPool.commonPool()
.submit(() -> UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer());
ARRIVE.await();
// try move region to the new region server, it will fail, but we need to make sure that we do
// not try to assign it to the new server.
HRegionServer src = UTIL.getRSForFirstRegionInTable(NAME);
HRegionServer dst = UTIL.getOtherRegionServer(src);
try {
UTIL.getAdmin().move(UTIL.getAdmin().getRegions(NAME).get(0).getEncodedNameAsBytes(),
dst.getServerName());
// assert the region should still on the original region server, and we didn't call assign to
// the new server
assertSame(src, UTIL.getRSForFirstRegionInTable(NAME));
assertFalse(ASSIGN_CALLED.get());
} finally {
// let the region server go
RESUME.countDown();
}
// wait the new region server online
assertSame(dst, regionServerFuture.get());
// try move again
UTIL.getAdmin().move(UTIL.getAdmin().getRegions(NAME).get(0).getEncodedNameAsBytes(),
dst.getServerName());
// this time the region should be on the new region server
assertSame(dst, UTIL.getRSForFirstRegionInTable(NAME));
assertTrue(ASSIGN_CALLED.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ protected void doCrash(final ServerName serverName) {
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
newRsAdded++;
try {
this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder.of(newSn));
this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder
.newBuilder(newSn).setLastReportTimestamp(System.currentTimeMillis()).build());
} catch (YouAreDeadException e) {
// should not happen
throw new UncheckedIOException(e);
Expand Down
Loading

0 comments on commit 8110b18

Please sign in to comment.