Skip to content

Commit

Permalink
HBASE-28146: Make ServerManager rsAdmins map thread safe (apache#5461)
Browse files Browse the repository at this point in the history
Co-authored-by: Ray Mattingly <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
(cherry picked from commit 1641a4a)
  • Loading branch information
rmdmattingly authored and Apache9 committed Oct 23, 2023
1 parent cd3a83f commit dc5539c
Showing 1 changed file with 7 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -44,6 +43,7 @@
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
Expand Down Expand Up @@ -123,12 +123,6 @@ public class ServerManager {
private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
new ConcurrentSkipListMap<>();

/**
* Map of admin interfaces per registered regionserver; these interfaces we use to control
* regionservers out on the cluster
*/
private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();

/** List of region servers that should not get any more new regions. */
private final ArrayList<ServerName> drainingServers = new ArrayList<>();

Expand Down Expand Up @@ -393,7 +387,6 @@ private ServerName findServerWithSameHostnamePortWithLock(final ServerName serve
void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
LOG.info("Registering regionserver=" + serverName);
this.onlineServers.put(serverName, sl);
this.rsAdmins.remove(serverName);
}

public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
Expand Down Expand Up @@ -595,7 +588,6 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
LOG.trace("Expiration of {} but server not online", sn);
}
}
this.rsAdmins.remove(sn);
}

/*
Expand Down Expand Up @@ -707,18 +699,13 @@ public static void closeRegionSilentlyAndWait(ClusterConnection connection, Serv
* @throws RetriesExhaustedException wrapping a ConnectException if failed
*/
public AdminService.BlockingInterface getRsAdmin(final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
if (admin == null) {
LOG.debug("New admin connection to " + sn.toString());
if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
// A master is also a region server now, see HBASE-10569 for details
admin = ((HRegionServer) master).getRSRpcServices();
} else {
admin = this.connection.getAdmin(sn);
}
this.rsAdmins.put(sn, admin);
LOG.debug("New admin connection to {}", sn);
if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
// A master is also a region server now, see HBASE-10569 for details
return ((HRegionServer) master).getRSRpcServices();
} else {
return this.connection.getAdmin(sn);
}
return admin;
}

/**
Expand Down

0 comments on commit dc5539c

Please sign in to comment.