Skip to content

Commit

Permalink
Return detached threads to the pool (#8286)
Browse files Browse the repository at this point in the history
Once detached threads are finished their execution they emit the 'exit' command. Instead of a noop they should rejoin the pool.

Resolves #8201.
  • Loading branch information
VirtualTim authored and kripken committed May 20, 2019
1 parent 8f39b6c commit bb2428b
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,14 @@ var LibraryPThread = {
pthread.stackBase = 0;
if (pthread.worker) pthread.worker.pthread = null;
},

returnWorkerToPool: function(worker) {
delete PThread.pthreads[worker.pthread.thread];
//Note: worker is intentionally not terminated so the pool can dynamically grow.
PThread.unusedWorkerPool.push(worker);
PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore
PThread.freeThreadData(worker.pthread);
worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker.
},
receiveObjectTransfer: function(data) {
#if OFFSCREENCANVAS_SUPPORT
if (typeof GL !== 'undefined') {
Expand Down Expand Up @@ -294,17 +301,16 @@ var LibraryPThread = {
} else if (d.cmd === 'alert') {
alert('Thread ' + d.threadId + ': ' + d.text);
} else if (d.cmd === 'exit') {
// Thread is exiting, no-op here
var detached = worker.pthread && Atomics.load(HEAPU32, (worker.pthread.thread + {{{ C_STRUCTS.pthread.detached }}}) >> 2);
if (detached) {
PThread.returnWorkerToPool(worker);
}
} else if (d.cmd === 'exitProcess') {
// A pthread has requested to exit the whole application process (runtime).
Module['noExitRuntime'] = false;
exit(d.returnCode);
} else if (d.cmd === 'cancelDone') {
PThread.freeThreadData(worker.pthread);
worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker.
PThread.unusedWorkerPool.push(worker);
// TODO: Free if detached.
PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore.
PThread.returnWorkerToPool(worker);
} else if (d.cmd === 'objectTransfer') {
PThread.receiveObjectTransfer(e.data);
} else if (e.data.target === 'setimmediate') {
Expand Down Expand Up @@ -381,11 +387,10 @@ var LibraryPThread = {
if (!pthread_ptr) throw 'Internal Error! Null pthread_ptr in _cleanup_thread!';
{{{ makeSetValue('pthread_ptr', C_STRUCTS.pthread.self, 0, 'i32') }}};
var pthread = PThread.pthreads[pthread_ptr];
var worker = pthread.worker;
PThread.freeThreadData(pthread);
worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker.
PThread.unusedWorkerPool.push(worker);
PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore.
if (pthread) {
var worker = pthread.worker;
PThread.returnWorkerToPool(worker);
}
},

_cancel_thread: function(pthread_ptr) {
Expand Down Expand Up @@ -729,7 +734,7 @@ var LibraryPThread = {
Atomics.store(HEAPU32, (thread + {{{ C_STRUCTS.pthread.detached }}} ) >> 2, 1); // Mark the thread as detached.

if (!ENVIRONMENT_IS_PTHREAD) __cleanup_thread(thread);
else postMessage({ cmd: 'cleanupThread', thread: thread});
else postMessage({ cmd: 'cleanupThread', thread: thread });
return 0;
}
// TODO HACK! Replace the _js variant with just _pthread_testcancel:
Expand Down
76 changes: 76 additions & 0 deletions tests/pthread/test_std_thread_detach.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2019 The Emscripten Authors. All rights reserved.
// Emscripten is available under two separate licenses, the MIT license and the
// University of Illinois/NCSA Open Source License. Both these licenses can be
// found in the LICENSE file.

#include <thread>
#include <math.h>
#include <emscripten.h>
#include <assert.h>

#ifndef REPORT_RESULT
#include <iostream>
#endif

extern "C" {
//Create a thread that does some work
void EMSCRIPTEN_KEEPALIVE spawn_a_thread() {
std::thread( [] {
double d=0;
for (int i=0; i<10; i++) //simulate work
d += (i%2 ? sqrt((int)(rand())) : (-1)*sqrt((int)(rand())));
} ).detach();
}


//Check that the number of workers is less than the number of spawned threads.
void EMSCRIPTEN_KEEPALIVE count_threads(int num_threads_spawned, int num_threads_spawned_extra) {
num_threads_spawned += num_threads_spawned_extra;
int num_workers = EM_ASM_INT({
return PThread.runningWorkers.length + PThread.unusedWorkerPool.length;
});

#ifdef REPORT_RESULT
if (num_threads_spawned_extra == 0) //check extra thread spawned
REPORT_RESULT(-1);
if (num_workers < num_threads_spawned) //check worker returned to pool and was assigned another thread
REPORT_RESULT(0);
else
REPORT_RESULT(num_workers);
#else
std::cout <<
"Worker pool size: " << num_workers <<
", Number of threads spawned: " << num_threads_spawned
<< "." << std::endl;
assert(num_threads_spawned_extra != 0);
assert(num_workers < num_threads_spawned);
#endif
}
}

//Spawn a detached thread every 0.1s. After 0.3s Check that the number of workers are less than the number of spawned threads
int main(int argc, char** argv) {
EM_ASM(
let thread_check = 0;
const max_thread_check = 5; //fail the test if the number of threads doesn't go down after checking this many times
const threads_to_spawn = 3;
let threads_to_spawn_extra = 0;

//Spawn some detached threads
for (let i=0; i<threads_to_spawn; i++) {
setTimeout(() => { _spawn_a_thread(); }, i*100);
}

//Check if a worker is free every threads_to_spawn*100 ms, or until max_thread_check is exceeded
const SpawnMoreThreads = setInterval(() => {
if (PThread.unusedWorkerPool.length > 0) { //Spawn a thread if a worker is available
_spawn_a_thread();
threads_to_spawn_extra++;
}
if (thread_check++ > max_thread_check || threads_to_spawn_extra > 0) {
clearInterval(SpawnMoreThreads);
_count_threads(threads_to_spawn, threads_to_spawn_extra);
}
}, threads_to_spawn*100);
);
}
5 changes: 5 additions & 0 deletions tests/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3622,6 +3622,11 @@ def test_pthread_nested_spawns(self):
def test_pthread_join(self):
self.btest(path_from_root('tests', 'pthread', 'test_pthread_join.cpp'), expected='6765', args=['-O3', '-s', 'USE_PTHREADS=1', '-s', 'PTHREAD_POOL_SIZE=8'])

# Test that threads can rejoin the pool once detached and finished
@requires_threads
def test_std_thread_detach(self):
self.btest(path_from_root('tests', 'pthread', 'test_std_thread_detach.cpp'), expected='0', args=['-std=c++11', '-s', 'USE_PTHREADS=1'])

# Test pthread_cancel() operation
@requires_threads
def test_pthread_cancel(self):
Expand Down

0 comments on commit bb2428b

Please sign in to comment.