Skip to content

Commit

Permalink
Fix wasm workers under node
Browse files Browse the repository at this point in the history
- Use callUserCallback to invoke callback in _wasmWorkerRunPostMessage.
  Without this calls to exit/emscripten_force_exit within the callback
  don't work as expected (they cause unhandled exception errors).
- Fix `onmessage` handling under node so that the message payload always
  arrives as the `data` member of the message.
- Update a few of the wasm workers tests do they actually exit (required
  for running tests under node).
  • Loading branch information
sbc100 committed Oct 18, 2024
1 parent 10cb9d4 commit 172855b
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 36 deletions.
19 changes: 9 additions & 10 deletions src/library_wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,13 @@ addToLibrary({
},

// Executes a wasm function call received via a postMessage.
$_wasmWorkerRunPostMessage__deps: ['$callUserCallback'],
$_wasmWorkerRunPostMessage: (e) => {
// '_wsc' is short for 'wasm call', trying to use an identifier name that
// will never conflict with user code
#if ENVIRONMENT_MAY_BE_NODE
// In Node.js environment, message event 'e' containing the actual data sent,
// while in the browser environment it's contained by 'e.data'.
let data = ENVIRONMENT_IS_NODE ? e : e.data;
#else
let data = e.data;
#endif
let wasmCall = data['_wsc'];
wasmCall && getWasmTableEntry(wasmCall)(...data['x']);
wasmCall && callUserCallback(() => getWasmTableEntry(wasmCall)(...data['x']));
},

// src/postamble_minimal.js brings this symbol in to the build, and calls this
Expand Down Expand Up @@ -209,6 +204,12 @@ if (ENVIRONMENT_IS_WASM_WORKER
#endif
});
worker.onmessage = _wasmWorkerRunPostMessage;
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
/** @suppress {checkTypes} */
worker.on('message', (msg) => worker.onmessage({ data: msg }));
}
#endif
return _wasmWorkersID++;
},

Expand All @@ -226,9 +227,7 @@ if (ENVIRONMENT_IS_WASM_WORKER
#if ASSERTIONS
assert(!ENVIRONMENT_IS_WASM_WORKER, 'emscripten_terminate_all_wasm_workers() cannot be called from a Wasm Worker: only the main browser thread has visibility to terminate all Workers!');
#endif
Object.values(_wasmWorkers).forEach((worker) => {
worker.terminate();
});
Object.values(_wasmWorkers).forEach((worker) => worker.terminate());
_wasmWorkers = {};
},

Expand Down
2 changes: 1 addition & 1 deletion src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
// Create as web-worker-like an environment as we can.

var parentPort = worker_threads['parentPort'];
parentPort.on('message', (data) => onmessage({ data: data }));
parentPort.on('message', (msg) => onmessage({ data: msg }));

Object.assign(globalThis, {
self: global,
Expand Down
18 changes: 15 additions & 3 deletions src/wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ if (ENVIRONMENT_IS_NODE) {

var parentPort = nodeWorkerThreads.parentPort;

parentPort.on('message', (data) => typeof onmessage === "function" && onmessage({ data: data }));
parentPort.on('message', (msg) => global.onmessage?.({ data: msg }));

// Weak map of handle functions to their wrapper. Used to implement
// addEventListener/removeEventListener.
var wrappedHandlers = new WeakMap();
function wrapMsgHandler(h) {
var f = wrappedHandlers.get(h)
if (!f) {
f = (msg) => h({data: msg});
wrappedHandlers.set(h, f);
}
return f;
}

var fs = require('fs');
var vm = require('vm');
Expand All @@ -28,8 +40,8 @@ if (ENVIRONMENT_IS_NODE) {
importScripts: (f) => vm.runInThisContext(fs.readFileSync(f, 'utf8'), {filename: f}),
postMessage: (msg) => parentPort.postMessage(msg),
performance: global.performance || { now: Date.now },
addEventListener: (name, handler) => parentPort.on(name, handler),
removeEventListener: (name, handler) => parentPort.off(name, handler),
addEventListener: (name, handler) => parentPort.on(name, wrapMsgHandler(handler)),
removeEventListener: (name, handler) => parentPort.off(name, wrapMsgHandler(handler)),
});
}
#endif // ENVIRONMENT_MAY_BE_NODE
Expand Down
14 changes: 7 additions & 7 deletions test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4981,21 +4981,21 @@ def test_system(self):
# Tests the hello_wasm_worker.c documentation example code.
@also_with_minimal_runtime
def test_wasm_worker_hello(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS'])

def test_wasm_worker_hello_minimal_runtime_2(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])

# Tests Wasm Workers build in Wasm2JS mode.
@requires_wasm2js
@also_with_minimal_runtime
def test_wasm_worker_hello_wasm2js(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sWASM=0'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sWASM=0'])

# Tests the WASM_WORKERS=2 build mode, which embeds the Wasm Worker bootstrap JS script file to the main JS file.
@also_with_minimal_runtime
def test_wasm_worker_embedded(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS=2'])
def test_wasm_worker_hello_embedded(self):
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS=2'])

# Tests that it is possible to call emscripten_futex_wait() in Wasm Workers.
@parameterized({
Expand Down Expand Up @@ -5059,7 +5059,7 @@ def test_wasm_worker_sleep(self):
# Tests emscripten_terminate_wasm_worker()
@also_with_minimal_runtime
def test_wasm_worker_terminate(self):
self.btest('wasm_worker/terminate_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
self.btest_exit('wasm_worker/terminate_wasm_worker.c', args=['-sWASM_WORKERS'])

# Tests emscripten_terminate_all_wasm_workers()
@also_with_minimal_runtime
Expand Down Expand Up @@ -5133,7 +5133,7 @@ def test_wasm_worker_lock_wait2(self):
# Tests emscripten_lock_async_acquire() function.
@also_with_minimal_runtime
def test_wasm_worker_lock_async_acquire(self):
self.btest('wasm_worker/lock_async_acquire.c', expected='0', args=['--closure=1', '-sWASM_WORKERS'])
self.btest_exit('wasm_worker/lock_async_acquire.c', args=['--closure=1', '-sWASM_WORKERS'])

# Tests emscripten_lock_busyspin_wait_acquire() in Worker and main thread.
@also_with_minimal_runtime
Expand Down
6 changes: 6 additions & 0 deletions test/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -13901,6 +13901,12 @@ def test_debug_opt_warning(self, should_fail, args):
else:
self.run_process([EMCC, test_file('hello_world.c'), '-Werror'] + args)

def test_wasm_worker_hello(self):
self.do_runf(test_file('wasm_worker/hello_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

def test_wasm_worker_terminate(self):
self.do_runf(test_file('wasm_worker/terminate_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

@also_with_minimal_runtime
def test_wasm_worker_closure(self):
self.run_process([EMCC, test_file('wasm_worker/lock_async_acquire.c'), '-O2', '-sWASM_WORKERS', '--closure=1'])
Expand Down
11 changes: 8 additions & 3 deletions test/wasm_worker/hello_wasm_worker.c
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
#include <emscripten/emscripten.h>
#include <emscripten/console.h>
#include <emscripten/em_asm.h>
#include <emscripten/wasm_worker.h>
#include <assert.h>

// This is the code example in site/source/docs/api_reference/wasm_workers.rst
void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void run_in_worker() {
emscripten_out("Hello from wasm worker!\n");
EM_ASM(typeof checkStackCookie == 'function' && checkStackCookie());
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}

int main() {
emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(/*stack size: */1024);
assert(worker);
emscripten_wasm_worker_post_function_v(worker, run_in_worker);
emscripten_exit_with_live_runtime();
}
19 changes: 13 additions & 6 deletions test/wasm_worker/lock_async_acquire.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ bool testFinished = false;
int numTimesMainThreadAcquiredLock = 0;
int numTimesWasmWorkerAcquiredLock = 0;

void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void work() {
// emscripten_out("work");
volatile int x = sharedState0;
Expand All @@ -37,18 +43,17 @@ void work() {
sharedState0 = x;
} else {
y = x + 1;
if (emscripten_current_thread_is_wasm_worker())
if (emscripten_current_thread_is_wasm_worker()) {
emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000));
}
sharedState1 = y;

if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) {
if (!testFinished) {
testFinished = true;
emscripten_out("test finished");
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}
testFinished = true;
}
}
}
Expand All @@ -75,8 +80,9 @@ void schedule_work(void *userData) {
// emscripten_out("sync lock acquired");
work();
emscripten_lock_release(&lock);
if (!testFinished)
if (!testFinished) {
emscripten_set_timeout(schedule_work, 0, 0);
}
} else {
emscripten_lock_async_acquire(&lock, lock_async_acquired, (void*)42, EMSCRIPTEN_WAIT_ASYNC_INFINITY);
}
Expand All @@ -94,4 +100,5 @@ int main() {
}

schedule_work(0);
emscripten_exit_with_live_runtime();
}
9 changes: 3 additions & 6 deletions test/wasm_worker/terminate_wasm_worker.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <emscripten/emscripten.h>
#include <emscripten/console.h>
#include <emscripten/em_asm.h>
#include <emscripten/em_js.h>
Expand All @@ -13,17 +14,13 @@ static volatile int worker_started = 0;
void this_function_should_not_be_called(void *userData) {
worker_started = -1;
emscripten_err("this_function_should_not_be_called");
#ifdef REPORT_RESULT
REPORT_RESULT(1/*fail*/);
#endif
emscripten_force_exit(1);
}

void test_passed(void *userData) {
if (worker_started == 1) {
emscripten_err("test_passed");
#ifdef REPORT_RESULT
REPORT_RESULT(0/*ok*/);
#endif
emscripten_force_exit(0);
}
}

Expand Down

0 comments on commit 172855b

Please sign in to comment.