Skip to content

Commit

Permalink
HBASE-26245 Store region server list in master local region (#4136)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Purtell <[email protected]>
(cherry picked from commit bb1bbdd)
(cherry picked from commit 2711142)
  • Loading branch information
Apache9 committed Mar 31, 2022
1 parent 08ecae9 commit 75c81c7
Show file tree
Hide file tree
Showing 17 changed files with 368 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// the master local storage to store procedure data, meta region locations, etc.
private MasterRegion masterRegion;

private RegionServerList rsListStorage;

// handle table states
private TableStateManager tableStateManager;

Expand Down Expand Up @@ -896,14 +898,19 @@ private void finishActiveMasterInitialization(MonitoredTask status)
}

status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
this.serverManager = createServerManager(this);
// The below two managers must be created before loading procedures, as they will be used during
// loading.
// initialize master local region
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);

this.serverManager = createServerManager(this, rsListStorage);
if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
this.splitWALManager = new SplitWALManager(this);
}

// initialize master local region
masterRegion = MasterRegionFactory.create(this);


tryMigrateMetaLocationsFromZooKeeper();

Expand Down Expand Up @@ -932,7 +939,8 @@ private void finishActiveMasterInitialization(MonitoredTask status)
this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
walManager.getSplittingServersFromWALDir());
// This manager will be started AFTER hbase:meta is confirmed on line.
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
Expand Down Expand Up @@ -1376,11 +1384,12 @@ private void initMobCleaner() {
* </p>
*/
@InterfaceAudience.Private
protected ServerManager createServerManager(final MasterServices master) throws IOException {
protected ServerManager createServerManager(MasterServices master,
RegionServerList storage) throws IOException {
// We put this out here in a method so can do a Mockito.spy and stub it out
// w/ a mocked up ServerManager.
setupClusterConnection();
return new ServerManager(master);
return new ServerManager(master, storage);
}

private void waitForRegionServers(final MonitoredTask status)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.assignment.ServerState;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link MasterRegion} based {@link RegionServerList}.
* <p/>
* This is useful when we want to restart a cluster with only the data on file system, as when
* restarting, we need to get the previous live region servers for scheduling SCP. Before we have
* this class, we need to scan the WAL directory on WAL file system to find out the previous live
* region servers, which means we can not restart a cluster without the previous WAL file system,
* even if we have flushed all the data.
* <p/>
* Please see HBASE-26245 for more details.
*/
@InterfaceAudience.Private
public class MasterRegionServerList implements RegionServerList {

private static final Logger LOG = LoggerFactory.getLogger(MasterRegionServerList.class);

private final MasterRegion region;

private final Abortable abortable;

public MasterRegionServerList(MasterRegion region, Abortable abortable) {
this.region = region;
this.abortable = abortable;
}

@Override
public void started(ServerName sn) {
Put put =
new Put(Bytes.toBytes(sn.getServerName())).addColumn(MasterRegionFactory.REGION_SERVER_FAMILY,
HConstants.STATE_QUALIFIER, Bytes.toBytes(ServerState.ONLINE.name()));
try {
region.update(r -> r.put(put));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to record region server {} as started, aborting...", sn,
e);
abortable.abort("Failed to record region server as started");
throw new UncheckedIOException(e);
}
}

@Override
public void expired(ServerName sn) {
Delete delete = new Delete(Bytes.toBytes(sn.getServerName()))
.addFamily(MasterRegionFactory.REGION_SERVER_FAMILY);
try {
region.update(r -> r.delete(delete));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to record region server {} as expired, aborting...", sn,
e);
abortable.abort("Failed to record region server as expired");
throw new UncheckedIOException(e);
}
}

@Override
public Set<ServerName> getAll() throws IOException {
Set<ServerName> rsList = new HashSet<>();
try (ResultScanner scanner =
region.getScanner(new Scan().addFamily(MasterRegionFactory.REGION_SERVER_FAMILY))) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
rsList.add(ServerName.valueOf(Bytes.toString(result.getRow())));
}
}
return rsList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ private boolean checkFileSystem() {

/**
* Get Servernames which are currently splitting; paths have a '-splitting' suffix.
* @return ServerName
* @throws IOException IOException
*/
public Set<ServerName> getSplittingServersFromWALDir() throws IOException {
return getServerNamesFromWALDirPath(
Expand All @@ -168,8 +166,6 @@ public Set<ServerName> getSplittingServersFromWALDir() throws IOException {
/**
* Get Servernames that COULD BE 'alive'; excludes those that have a '-splitting' suffix as these
* are already being split -- they cannot be 'alive'.
* @return ServerName
* @throws IOException IOException
*/
public Set<ServerName> getLiveServersFromWALDir() throws IOException {
return getServerNamesFromWALDirPath(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* For storing the region server list.
* <p/>
* Mainly be used when restarting master, to load the previous active region server list.
*/
@InterfaceAudience.Private
public interface RegionServerList {

/**
* Called when a region server join the cluster.
*/
void started(ServerName sn);

/**
* Called when a region server is dead.
*/
void expired(ServerName sn);

/**
* Get all live region servers.
*/
Set<ServerName> getAll() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,22 @@ private RegionServerInfo getServerInfo(ServerName serverName)
* {@link ServerManager#findDeadServersAndProcess(Set, Set)}, we call it here under the lock
* protection to prevent concurrency issues with server expiration operation.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
* @param liveServersBeforeRestart the live region servers we recorded before master restarts.
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
*/
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersBeforeRestart,
Set<ServerName> splittingServersFromWALDir) throws KeeperException, IOException {
LOG.info(
"Upgrading RegionServerTracker to active master mode; {} have existing" +
"ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.",
deadServersFromPE.size(), liveServersFromWALDir.size(), splittingServersFromWALDir.size());
deadServersFromPE.size(), liveServersBeforeRestart.size(), splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
// create ServerNode for all possible live servers from wal directory
liveServersFromWALDir
liveServersBeforeRestart
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
ServerManager serverManager = server.getServerManager();
synchronized (this) {
Expand All @@ -142,7 +142,7 @@ public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServe
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersBeforeRestart);
active = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public class ServerManager {

private final MasterServices master;
private final ClusterConnection connection;
private final RegionServerList storage;

private final DeadServer deadservers = new DeadServer();

Expand All @@ -153,8 +154,9 @@ public class ServerManager {
/**
* Constructor.
*/
public ServerManager(final MasterServices master) {
public ServerManager(final MasterServices master, RegionServerList storage) {
this.master = master;
this.storage = storage;
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
Expand Down Expand Up @@ -185,7 +187,6 @@ public boolean unregisterListener(final ServerListener listener) {
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
* @param ia the InetAddress from which request is received
* @return The ServerName we know this server as.
* @throws IOException
*/
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
String version, InetAddress ia) throws IOException {
Expand All @@ -206,13 +207,12 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
}
storage.started(sn);
return sn;
}

/**
* Updates last flushed sequence Ids for the regions on server sn
* @param sn
* @param hsl
*/
private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
Expand Down Expand Up @@ -581,6 +581,7 @@ synchronized long expireServer(final ServerName serverName, boolean force) {
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
storage.expired(serverName);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ public final class MasterRegionFactory {

public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");

public static final byte[] REGION_SERVER_FAMILY = Bytes.toBytes("rs");

private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
.setMaxVersions(HConstants.DEFAULT_HBASE_META_VERSIONS).setInMemory(true)
.setBlocksize(HConstants.DEFAULT_HBASE_META_BLOCK_SIZE).setBloomFilterType(BloomType.ROWCOL)
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(REGION_SERVER_FAMILY))
.build();

private static TableDescriptor withTrackerConfigs(Configuration conf) {
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;

public class DummyRegionServerList implements RegionServerList {

@Override
public void started(ServerName sn) {
}

@Override
public void expired(ServerName sn) {
}

@Override
public Set<ServerName> getAll() throws IOException {
return Collections.emptySet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ClusterConnection getClusterConnection() {
when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class));
return conn;
}
});
}, new DummyRegionServerList());

LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost();
Expand Down
Loading

0 comments on commit 75c81c7

Please sign in to comment.