Skip to content

Commit

Permalink
HBASE-23615 Use a dedicated thread for executing WorkerMonitor in Pro… (
Browse files Browse the repository at this point in the history
#961)

Signed-off-by: stack <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: virajjasani <[email protected]>
  • Loading branch information
binlijin committed Dec 31, 2019
1 parent a1d5e81 commit 5b9a042
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ public interface ProcedureExecutorListener {
*/
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;

/**
* WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
* there is no worker to assign meta, it will new worker thread for it, so it is very important.
* TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
* and so on, some tasks may execute for a long time so will block other tasks like
* WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
*/
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

private int corePoolSize;
private int maxPoolSize;

Expand Down Expand Up @@ -560,7 +569,8 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
corePoolSize, maxPoolSize);

this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

// Create the workers
workerId.set(0);
Expand Down Expand Up @@ -604,12 +614,13 @@ public void startWorkers() throws IOException {
// Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start();
workerMonitorExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
}

// Internal chores
timeoutExecutor.add(new WorkerMonitor());
workerMonitorExecutor.add(new WorkerMonitor());

// Add completed cleaner chore
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
Expand All @@ -624,6 +635,7 @@ public void stop() {
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
workerMonitorExecutor.sendStopSignal();
}

@VisibleForTesting
Expand All @@ -632,6 +644,8 @@ public void join() {

// stop the timeout executor
timeoutExecutor.awaitTermination();
// stop the work monitor executor
workerMonitorExecutor.awaitTermination();

// stop the worker threads
for (WorkerThread worker: workerThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {

private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();

public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group,
String name) {
super(group, name);
setDaemon(true);
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TestProcedureAdmin {

protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();


private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
}
Expand Down

0 comments on commit 5b9a042

Please sign in to comment.