Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28342 Decommissioned hosts should be rejected by the HMaster #5681

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,20 @@ public enum OperationStatusCode {
*/
public final static boolean HBASE_SERVER_USEIP_ENABLED_DEFAULT = false;

/**
* Should the HMaster reject hosts of decommissioned RegionServers, bypass matching their port and
* startcode parts of their ServerName or not? When True, the HMaster will reject a RegionServer's
* request to `reportForDuty` if it's hostname exists in the list of decommissioned RegionServers
* it maintains internally. Added in HBASE-28342.
*/
public final static String REJECT_DECOMMISSIONED_HOSTS_KEY =
"hbase.master.reject.decommissioned.hosts";

/**
* Default value of {@link #REJECT_DECOMMISSIONED_HOSTS_KEY}
*/
public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class DecommissionedHostRejectedException extends HBaseIOException {
public DecommissionedHostRejectedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ public HMaster(final Configuration conf) throws IOException {
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);

// Do we publish the status?

boolean shouldPublish =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
Expand Down Expand Up @@ -997,7 +996,10 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);

// Initialize the ServerManager and register it as a configuration observer
this.serverManager = createServerManager(this, rsListStorage);
this.configurationManager.registerObserver(this.serverManager);

this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
if (
!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
Expand Down Expand Up @@ -100,7 +102,7 @@
* only after the handler is fully enabled and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
public class ServerManager implements ConfigurationObserver {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
"hbase.master.wait.on.regionservers.maxtostart";

Expand Down Expand Up @@ -172,6 +174,9 @@ public class ServerManager {
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();

/** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
private volatile boolean rejectDecommissionedHostsConfig;

/**
* Constructor.
*/
Expand All @@ -183,6 +188,35 @@ public ServerManager(final MasterServices master, RegionServerList storage) {
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
persistFlushedSequenceId =
c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
}

/**
* Implementation of the ConfigurationObserver interface. We are interested in live-loading the
* configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
* @param conf Server configuration instance
*/
@Override
public void onConfigurationChange(Configuration conf) {
final boolean newValue = getRejectDecommissionedHostsConfig(conf);
if (rejectDecommissionedHostsConfig == newValue) {
// no-op
return;
}

LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
rejectDecommissionedHostsConfig, newValue);

rejectDecommissionedHostsConfig = newValue;
}

/**
* Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
* @param conf Configuration instance of the Master
*/
public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
}

/**
Expand Down Expand Up @@ -227,11 +261,14 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
final String hostname =
request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());

// Check if the host should be rejected based on it's decommissioned status
checkRejectableDecommissionedStatus(sn);

checkClockSkew(sn, request.getServerCurrentTime());
checkIsDead(sn, "STARTUP");
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
}
storage.started(sn);
return sn;
Expand Down Expand Up @@ -293,6 +330,42 @@ public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDea
updateLastFlushedSequenceIds(sn, sl);
}

/**
* Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
* to do so, any RegionServer trying to join the cluster will have it's host checked against the
* list of hosts of currently decommissioned servers and potentially get prevented from reporting
* for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
* details.
* @param sn The ServerName to check for
* @throws DecommissionedHostRejectedException if the Master is configured to reject
* decommissioned hosts and this host exists in the
* list of the decommissioned servers
*/
private void checkRejectableDecommissionedStatus(ServerName sn)
throws DecommissionedHostRejectedException {
LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());

// If the Master is not configured to reject decommissioned hosts, return early.
if (!rejectDecommissionedHostsConfig) {
return;
}

// Look for a match for the hostname in the list of decommissioned servers
for (ServerName server : getDrainingServersList()) {
if (Objects.equals(server.getHostname(), sn.getHostname())) {
// Found a match and master is configured to reject decommissioned hosts, throw exception!
LOG.warn(
"Rejecting RegionServer {} from reporting for duty because Master is configured "
+ "to reject decommissioned hosts and this host was marked as such in the past.",
sn.getServerName());
throw new DecommissionedHostRejectedException(String.format(
"Host %s exists in the list of decommissioned servers and Master is configured to "
+ "reject decommissioned hosts",
sn.getHostname()));
}
}
}

/**
* Check is a server of same host and port already exists, if not, or the existed one got a
* smaller start code, record it.
Expand Down Expand Up @@ -647,13 +720,8 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
* Remove the server from the drain list.
*/
public synchronized boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
LOG.info("Removing server {} from the draining list.", sn);

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Removing from draining list anyway, as requested.");
}
// Remove the server from the draining servers lists.
return this.drainingServers.remove(sn);
}
Expand All @@ -663,22 +731,23 @@ public synchronized boolean removeServerFromDrainList(final ServerName sn) {
* @return True if the server is added or the server is already on the drain list.
*/
public synchronized boolean addServerToDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Ignoring request to add it to draining list.");
// If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
// However, we want to add servers even if they're not online if the master is configured
// to reject decommissioned hosts
if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
sn);
return false;
}
// Add the server to the draining servers lists, if it's not already in
// it.

// Add the server to the draining servers lists, if it's not already in it.
if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list."
+ "Ignoring request to add it again.");
LOG.warn(
"Server {} is already in the draining server list. Ignoring request to add it again.", sn);
return true;
}
LOG.info("Server " + sn + " added to draining server list.");

LOG.info("Server {} added to draining server list.", sn);
return this.drainingServers.add(sn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
Expand Down Expand Up @@ -2664,6 +2665,11 @@ private RegionServerStartupResponse reportForDuty() throws IOException {
LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof DecommissionedHostRejectedException) {
LOG.error(HBaseMarkers.FATAL,
"Master rejected startup because the host is considered decommissioned", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof ServerNotRunningYetException) {
LOG.debug("Master is not running yet");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -30,9 +35,11 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -241,6 +248,72 @@ public void run() {
waitForClusterOnline(master);
}

/**
* Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
* configured to reject decommissioned hosts and when there is a match for the joining
* RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
*/
@Test
public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
aalhour marked this conversation as resolved.
Show resolved Hide resolved
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);

// Set the cluster to reject decommissioned hosts
cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);

master = cluster.addMaster();
rs = cluster.addRegionServer();
master.start();
rs.start();
waitForClusterOnline(master);

// Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
LogCapturer capturer =
new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
.getLogger(HRegionServer.class));

rs2 = cluster.addRegionServer();
master.getMaster().decommissionRegionServers(
Collections.singletonList(rs2.getRegionServer().getServerName()), false);
rs2.start();

// Assert that the second regionserver has aborted
testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));

// Assert that the log messages for DecommissionedHostRejectedException exist in the logs
capturer.stopCapturing();

assertThat(capturer.getOutput(),
containsString("Master rejected startup because the host is considered decommissioned"));

/**
* Assert that the following log message occurred (one line):
* "org.apache.hadoop.hbase.master.DecommissionedHostRejectedException:
* org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: Host localhost exists in
* the list of decommissioned servers and Master is configured to reject decommissioned hosts"
*/
assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));

assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(
allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
containsString("Unhandled"),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));
}

/**
* Tests region sever reportForDuty with a non-default environment edge
*/
Expand Down