From bb2428bbc650ae672fb2b0593fd1f9f78cc46eac Mon Sep 17 00:00:00 2001 From: Tim Lander Date: Tue, 21 May 2019 05:12:45 +0800 Subject: [PATCH] Return detached threads to the pool (#8286) Once detached threads are finished their execution they emit the 'exit' command. Instead of a noop they should rejoin the pool. Resolves #8201. --- src/library_pthread.js | 31 ++++++---- tests/pthread/test_std_thread_detach.cpp | 76 ++++++++++++++++++++++++ tests/test_browser.py | 5 ++ 3 files changed, 99 insertions(+), 13 deletions(-) create mode 100644 tests/pthread/test_std_thread_detach.cpp diff --git a/src/library_pthread.js b/src/library_pthread.js index 79a7aa1d76c18..3d13eed20bfd5 100644 --- a/src/library_pthread.js +++ b/src/library_pthread.js @@ -213,7 +213,14 @@ var LibraryPThread = { pthread.stackBase = 0; if (pthread.worker) pthread.worker.pthread = null; }, - + returnWorkerToPool: function(worker) { + delete PThread.pthreads[worker.pthread.thread]; + //Note: worker is intentionally not terminated so the pool can dynamically grow. + PThread.unusedWorkerPool.push(worker); + PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore + PThread.freeThreadData(worker.pthread); + worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker. + }, receiveObjectTransfer: function(data) { #if OFFSCREENCANVAS_SUPPORT if (typeof GL !== 'undefined') { @@ -294,17 +301,16 @@ var LibraryPThread = { } else if (d.cmd === 'alert') { alert('Thread ' + d.threadId + ': ' + d.text); } else if (d.cmd === 'exit') { - // Thread is exiting, no-op here + var detached = worker.pthread && Atomics.load(HEAPU32, (worker.pthread.thread + {{{ C_STRUCTS.pthread.detached }}}) >> 2); + if (detached) { + PThread.returnWorkerToPool(worker); + } } else if (d.cmd === 'exitProcess') { // A pthread has requested to exit the whole application process (runtime). Module['noExitRuntime'] = false; exit(d.returnCode); } else if (d.cmd === 'cancelDone') { - PThread.freeThreadData(worker.pthread); - worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker. - PThread.unusedWorkerPool.push(worker); - // TODO: Free if detached. - PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore. + PThread.returnWorkerToPool(worker); } else if (d.cmd === 'objectTransfer') { PThread.receiveObjectTransfer(e.data); } else if (e.data.target === 'setimmediate') { @@ -381,11 +387,10 @@ var LibraryPThread = { if (!pthread_ptr) throw 'Internal Error! Null pthread_ptr in _cleanup_thread!'; {{{ makeSetValue('pthread_ptr', C_STRUCTS.pthread.self, 0, 'i32') }}}; var pthread = PThread.pthreads[pthread_ptr]; - var worker = pthread.worker; - PThread.freeThreadData(pthread); - worker.pthread = undefined; // Detach the worker from the pthread object, and return it to the worker pool as an unused worker. - PThread.unusedWorkerPool.push(worker); - PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker.pthread), 1); // Not a running Worker anymore. + if (pthread) { + var worker = pthread.worker; + PThread.returnWorkerToPool(worker); + } }, _cancel_thread: function(pthread_ptr) { @@ -729,7 +734,7 @@ var LibraryPThread = { Atomics.store(HEAPU32, (thread + {{{ C_STRUCTS.pthread.detached }}} ) >> 2, 1); // Mark the thread as detached. if (!ENVIRONMENT_IS_PTHREAD) __cleanup_thread(thread); - else postMessage({ cmd: 'cleanupThread', thread: thread}); + else postMessage({ cmd: 'cleanupThread', thread: thread }); return 0; } // TODO HACK! Replace the _js variant with just _pthread_testcancel: diff --git a/tests/pthread/test_std_thread_detach.cpp b/tests/pthread/test_std_thread_detach.cpp new file mode 100644 index 0000000000000..934de368f30dc --- /dev/null +++ b/tests/pthread/test_std_thread_detach.cpp @@ -0,0 +1,76 @@ +// Copyright 2019 The Emscripten Authors. All rights reserved. +// Emscripten is available under two separate licenses, the MIT license and the +// University of Illinois/NCSA Open Source License. Both these licenses can be +// found in the LICENSE file. + +#include +#include +#include +#include + +#ifndef REPORT_RESULT +#include +#endif + +extern "C" { +//Create a thread that does some work +void EMSCRIPTEN_KEEPALIVE spawn_a_thread() { + std::thread( [] { + double d=0; + for (int i=0; i<10; i++) //simulate work + d += (i%2 ? sqrt((int)(rand())) : (-1)*sqrt((int)(rand()))); + } ).detach(); +} + + +//Check that the number of workers is less than the number of spawned threads. +void EMSCRIPTEN_KEEPALIVE count_threads(int num_threads_spawned, int num_threads_spawned_extra) { + num_threads_spawned += num_threads_spawned_extra; + int num_workers = EM_ASM_INT({ + return PThread.runningWorkers.length + PThread.unusedWorkerPool.length; + }); + +#ifdef REPORT_RESULT + if (num_threads_spawned_extra == 0) //check extra thread spawned + REPORT_RESULT(-1); + if (num_workers < num_threads_spawned) //check worker returned to pool and was assigned another thread + REPORT_RESULT(0); + else + REPORT_RESULT(num_workers); +#else + std::cout << + "Worker pool size: " << num_workers << + ", Number of threads spawned: " << num_threads_spawned + << "." << std::endl; + assert(num_threads_spawned_extra != 0); + assert(num_workers < num_threads_spawned); +#endif +} +} + +//Spawn a detached thread every 0.1s. After 0.3s Check that the number of workers are less than the number of spawned threads +int main(int argc, char** argv) { + EM_ASM( + let thread_check = 0; + const max_thread_check = 5; //fail the test if the number of threads doesn't go down after checking this many times + const threads_to_spawn = 3; + let threads_to_spawn_extra = 0; + + //Spawn some detached threads + for (let i=0; i { _spawn_a_thread(); }, i*100); + } + + //Check if a worker is free every threads_to_spawn*100 ms, or until max_thread_check is exceeded + const SpawnMoreThreads = setInterval(() => { + if (PThread.unusedWorkerPool.length > 0) { //Spawn a thread if a worker is available + _spawn_a_thread(); + threads_to_spawn_extra++; + } + if (thread_check++ > max_thread_check || threads_to_spawn_extra > 0) { + clearInterval(SpawnMoreThreads); + _count_threads(threads_to_spawn, threads_to_spawn_extra); + } + }, threads_to_spawn*100); + ); +} diff --git a/tests/test_browser.py b/tests/test_browser.py index b429e2153733b..45fca1dc81d9c 100644 --- a/tests/test_browser.py +++ b/tests/test_browser.py @@ -3622,6 +3622,11 @@ def test_pthread_nested_spawns(self): def test_pthread_join(self): self.btest(path_from_root('tests', 'pthread', 'test_pthread_join.cpp'), expected='6765', args=['-O3', '-s', 'USE_PTHREADS=1', '-s', 'PTHREAD_POOL_SIZE=8']) + # Test that threads can rejoin the pool once detached and finished + @requires_threads + def test_std_thread_detach(self): + self.btest(path_from_root('tests', 'pthread', 'test_std_thread_detach.cpp'), expected='0', args=['-std=c++11', '-s', 'USE_PTHREADS=1']) + # Test pthread_cancel() operation @requires_threads def test_pthread_cancel(self):