Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22081 master shutdown: close RpcServer and procWAL first thing #200

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1516,8 +1516,6 @@ protected void stopServiceThreads() {
this.assignmentManager.stop();
}

stopProcedureExecutor();

if (this.walManager != null) {
this.walManager.stop();
}
Expand Down Expand Up @@ -1565,7 +1563,9 @@ private void startProcedureExecutor() throws IOException {
procedureExecutor.startWorkers();
}

private void stopProcedureExecutor() {
@Override
protected void stopProcedureExecutorAndStore() {
super.stopProcedureExecutorAndStore();
if (procedureExecutor != null) {
configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
Expand Down Expand Up @@ -2968,7 +2968,7 @@ public void abort(String reason, Throwable cause) {
}

try {
stopMaster();
stopMaster(false);
} catch (IOException e) {
LOG.error("Exception occurred while stopping master", e);
}
Expand Down Expand Up @@ -3050,16 +3050,25 @@ public void shutdown() throws IOException {
}

public void stopMaster() throws IOException {
stopMaster(false);
}

public void stopMaster(boolean isRpc) throws IOException {
if (cpHost != null) {
cpHost.preStopMaster();
}
stop("Stopped by " + Thread.currentThread().getName());
stop("Stopped by " + Thread.currentThread().getName(), isRpc);
}

@Override
public void stop(String msg) {
stop(msg, false);
}

@Override
public void stop(String msg, boolean isRpc) {
if (!isStopped()) {
super.stop(msg);
super.stop(msg, isRpc);
if (this.activeMasterManager != null) {
this.activeMasterManager.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ public StopMasterResponse stopMaster(RpcController controller,
StopMasterRequest request) throws ServiceException {
LOG.info(master.getClientIdAuditPrefix() + " stop");
try {
master.stopMaster();
master.stopMaster(true);
} catch (IOException e) {
LOG.error("Exception occurred while stopping master", e);
throw new ServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -195,6 +196,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final CloneSnapsho
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-clone-snapshot", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
Expand Down Expand Up @@ -87,6 +88,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final CreateNamesp
throw new UnsupportedOperationException(this + " unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-create-namespace", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -123,6 +124,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableS
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-create-table", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -95,6 +97,11 @@ protected Flow executeFromState(MasterProcedureEnv env, DeleteNamespaceState sta
throw new UnsupportedOperationException(this + " unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-delete-namespace", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -135,6 +136,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState s
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-delete-table", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -180,6 +181,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final EnableTableS
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-enable-table", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.constraint.ConstraintException;
Expand Down Expand Up @@ -82,6 +83,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final ModifyNamesp
throw new UnsupportedOperationException(this + " unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-modify-namespace", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -136,6 +137,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableS
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-modify-table", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -161,6 +162,9 @@ protected Flow executeFromState(final MasterProcedureEnv env, final RestoreSnaps
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (InterruptedIOException e) {
// Handle interrupted IO; master is probably shutting down, no reason to retry.
throw new InterruptedException(e.getMessage());
} catch (IOException e) {
if (isRollbackSupported(state)) {
setFailure("master-restore-snapshot", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -149,6 +150,11 @@ protected Flow executeFromState(final MasterProcedureEnv env, TruncateTableState
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
// Check for a specific class; IIoEx is thrown for interrupted thread but not its subclasses.
// Handle interrupted IO; master is probably shutting down, no reason to retry.
if (e.getClass() == InterruptedIOException.class) {
throw new InterruptedException(e.getMessage());
}
if (isRollbackSupported(state)) {
setFailure("master-truncate-table", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ public class HRegionServer extends HasThread implements
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation.
// TODO: both of those are not protected by any lock...
private volatile boolean stopped = false;
private volatile boolean stoppedFromRpc = false;

// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
Expand Down Expand Up @@ -580,6 +582,7 @@ public HRegionServer(Configuration conf) throws IOException {

this.abortRequested = false;
this.stopped = false;
this.stoppedFromRpc = false;

rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
Expand Down Expand Up @@ -1062,6 +1065,27 @@ public void run() {
}
}

// First, shut down RCP services so we minimize race potential if we respond to requests
// in the middle of abort, from partial/invalid state.
if (this.rpcServices != null) {
// Note: if we were stopped from an RPC message, this is a race, but previously we'd have
// enough time to respond to that message while all the other stuff was shutting down.
// We don't have that artificial wait anymore, so if stopped from RCP wait explicitly.
// It's still a race so the client may receive a disconnect.
// There seems to be no way to wait for a particular response to be sent out...
if (stoppedFromRpc) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore, we will complete shutdown.
}
}
this.rpcServices.stop();
}

// Then, shut down procedures for the same reason; avoid persisting procedures in bad state.
stopProcedureExecutorAndStore();

if (this.leases != null) {
this.leases.closeAfterLeasesExpire();
}
Expand Down Expand Up @@ -1093,7 +1117,6 @@ public void run() {
if (this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
sendShutdownInterrupt();

// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
Expand Down Expand Up @@ -1173,10 +1196,6 @@ public void run() {
stopServiceThreads();
}

if (this.rpcServices != null) {
this.rpcServices.stop();
}

try {
deleteMyEphemeralNode();
} catch (KeeperException.NoNodeException nn) {
Expand Down Expand Up @@ -2187,7 +2206,11 @@ public ClusterConnection getClusterConnection() {

@Override
public void stop(final String msg) {
stop(msg, false, RpcServer.getRequestUser().orElse(null));
stop(msg, false);
}

public void stop(final String msg, boolean isRpc) {
stop(msg, false, RpcServer.getRequestUser().orElse(null), isRpc);
}

/**
Expand All @@ -2196,7 +2219,7 @@ public void stop(final String msg) {
* @param force True if this is a regionserver abort
* @param user The user executing the stop request, or null if no user is associated
*/
public void stop(final String msg, final boolean force, final User user) {
public void stop(final String msg, final boolean force, final User user, boolean isRpc) {
if (!this.stopped) {
LOG.info("***** STOPPING region server '" + this + "' *****");
if (this.rsHost != null) {
Expand All @@ -2211,6 +2234,7 @@ public void stop(final String msg, final boolean force, final User user) {
LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
}
}
this.stoppedFromRpc = isRpc || this.stoppedFromRpc;
this.stopped = true;
LOG.info("STOPPED: " + msg);
// Wakes run() if it is sleeping
Expand Down Expand Up @@ -2400,6 +2424,10 @@ public RSRpcServices getRSRpcServices() {
return rpcServices;
}

protected void stopProcedureExecutorAndStore() {
// No-op in RS; there's no procedure store so we don't have to shut the executor down early.
}

/**
* Cause the server to exit without closing the regions it is serving, the log
* it is using and without notifying the master. Used unit testing and on
Expand Down Expand Up @@ -2449,7 +2477,7 @@ public void abort(String reason, Throwable cause) {
LOG.warn("Unable to report fatal error to master", t);
}
// shutdown should be run as the internal user
stop(reason, true, null);
stop(reason, true, null, false);
}

/**
Expand All @@ -2475,12 +2503,6 @@ protected void kill() {
abort("Simulated kill");
}

/**
* Called on stop/abort before closing the cluster connection and meta locator.
*/
protected void sendShutdownInterrupt() {
}

/**
* Wait on all threads to finish. Presumption is that all closes and stops
* have already been called.
Expand Down
Loading