Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23615 Use a dedicated thread for executing WorkerMonitor in Pro… #961

Merged
merged 3 commits into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ public interface ProcedureExecutorListener {
*/
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;

/**
* WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
* there is no worker to assign meta, it will new worker thread for it, so it is very important.
* TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
* and so on, some tasks may execute for a long time so will block other tasks like
* WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
*/
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

private int corePoolSize;
private int maxPoolSize;

Expand Down Expand Up @@ -560,7 +569,8 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
corePoolSize, maxPoolSize);

this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

// Create the workers
workerId.set(0);
Expand Down Expand Up @@ -604,12 +614,13 @@ public void startWorkers() throws IOException {
// Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start();
workerMonitorExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
}

// Internal chores
timeoutExecutor.add(new WorkerMonitor());
workerMonitorExecutor.add(new WorkerMonitor());

// Add completed cleaner chore
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
Expand All @@ -624,6 +635,7 @@ public void stop() {
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
workerMonitorExecutor.sendStopSignal();
}

@VisibleForTesting
Expand All @@ -632,6 +644,8 @@ public void join() {

// stop the timeout executor
timeoutExecutor.awaitTermination();
// stop the work monitor executor
workerMonitorExecutor.awaitTermination();

// stop the worker threads
for (WorkerThread worker: workerThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {

private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();

public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group,
String name) {
super(group, name);
setDaemon(true);
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TestProcedureAdmin {

protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();


private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
}
Expand Down