Skip to content

Commit

Permalink
HBASE-23615 Use a dedicated thread for executing WorkerMonitor in Pro… (
Browse files Browse the repository at this point in the history
#961)

Signed-off-by: stack <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: virajjasani <[email protected]>
  • Loading branch information
binlijin committed Dec 31, 2019
1 parent ec5a7bb commit fcfe9ab
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ public interface ProcedureExecutorListener {
*/
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;

/**
* WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
* there is no worker to assign meta, it will new worker thread for it, so it is very important.
* TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
* and so on, some tasks may execute for a long time so will block other tasks like
* WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
*/
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

private int corePoolSize;
private int maxPoolSize;
private int urgentPoolSize;
Expand Down Expand Up @@ -590,7 +599,8 @@ public void init(int numThreads, int urgentNumThreads,
corePoolSize, maxPoolSize, urgentPoolSize);

this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

// Create the workers
workerId.set(0);
Expand Down Expand Up @@ -640,6 +650,7 @@ public void startWorkers() throws IOException {
LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(),
urgentWorkerThreads.size());
timeoutExecutor.start();
workerMonitorExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
}
Expand All @@ -649,7 +660,7 @@ public void startWorkers() throws IOException {
}

// Internal chores
timeoutExecutor.add(new WorkerMonitor());
workerMonitorExecutor.add(new WorkerMonitor());

if (upgradeTo2_2) {
timeoutExecutor.add(new InlineChore() {
Expand Down Expand Up @@ -683,6 +694,7 @@ public void stop() {
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
workerMonitorExecutor.sendStopSignal();
}

@VisibleForTesting
Expand All @@ -691,6 +703,8 @@ public void join() {

// stop the timeout executor
timeoutExecutor.awaitTermination();
// stop the work monitor executor
workerMonitorExecutor.awaitTermination();

// stop the worker threads
for (WorkerThread worker: workerThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {

private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();

public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group,
String name) {
super(group, name);
setDaemon(true);
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TestProcedureAdmin {

protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();


private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
Expand Down

0 comments on commit fcfe9ab

Please sign in to comment.