Skip to content

Commit

Permalink
HBASE-22933 Do not need to kick reassign for rs group change any more (
Browse files Browse the repository at this point in the history
…#550)

Signed-off-by: Guanghao Zhang <[email protected]>
  • Loading branch information
Apache9 authored Aug 30, 2019
1 parent f6a4c66 commit 090c55f
Showing 1 changed file with 0 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.rsgroup;

import com.google.protobuf.ServiceException;
Expand All @@ -33,7 +32,6 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
Expand All @@ -45,7 +43,6 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -59,7 +56,6 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.net.Address;
Expand Down Expand Up @@ -136,7 +132,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
private FailedOpenUpdaterThread failedOpenUpdaterThread;

private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
Expand All @@ -150,9 +145,6 @@ private synchronized void init() throws IOException {
refresh();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
failedOpenUpdaterThread.start();
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
}

static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
Expand Down Expand Up @@ -625,26 +617,6 @@ private synchronized void updateDefaultServers(SortedSet<Address> servers) throw
flushConfig(newGroupMap);
}

// Called by FailedOpenUpdaterThread
private void updateFailedAssignments() {
// Kick all regions in FAILED_OPEN state
List<RegionInfo> stuckAssignments = Lists.newArrayList();
for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
.getRegionsInTransition()) {
if (state.isStuck()) {
stuckAssignments.add(state.getRegionInfo());
}
}
for (RegionInfo region : stuckAssignments) {
LOG.info("Retrying assignment of " + region);
try {
masterServices.getAssignmentManager().unassign(region);
} catch (IOException e) {
LOG.warn("Unable to reassign " + region, e);
}
}
}

/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
Expand Down Expand Up @@ -704,66 +676,6 @@ public void run() {
}
}

private class FailedOpenUpdaterThread extends Thread implements ServerListener {
private final long waitInterval;
private volatile boolean hasChanged = false;

public FailedOpenUpdaterThread(Configuration conf) {
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
setDaemon(true);
}

@Override
public void serverAdded(ServerName serverName) {
serverChanged();
}

@Override
public void serverRemoved(ServerName serverName) {
}

@Override
public void run() {
while (isMasterRunning(masterServices)) {
boolean interrupted = false;
try {
synchronized (this) {
while (!hasChanged) {
wait();
}
hasChanged = false;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
interrupted = true;
}
if (!isMasterRunning(masterServices) || interrupted) {
continue;
}

// First, wait a while in case more servers are about to rejoin the cluster
try {
Thread.sleep(waitInterval);
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
}
if (!isMasterRunning(masterServices)) {
continue;
}

// Kick all regions in FAILED_OPEN state
updateFailedAssignments();
}
}

public void serverChanged() {
synchronized (this) {
hasChanged = true;
this.notify();
}
}
}

private class RSGroupStartupWorker extends Thread {
private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
private volatile boolean online = false;
Expand Down

0 comments on commit 090c55f

Please sign in to comment.