Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker thread support #6

Open
willfrew opened this issue Sep 15, 2022 · 5 comments
Open

Worker thread support #6

willfrew opened this issue Sep 15, 2022 · 5 comments

Comments

@willfrew
Copy link

Hi! I'm trying to use this module in a worker thread but am hitting the following error:

FATAL ERROR: HandleScope::HandleScope Entering the V8 API without proper locking in place
 1: 0xb6dd00 node::Abort() [/usr/local/bin/node]
 2: 0xa7da28 node::FatalError(char const*, char const*) [/usr/local/bin/node]
 3: 0xd471fa v8::Utils::ReportApiFailure(char const*, char const*) [/usr/local/bin/node]
 4: 0xd4875c v8::HandleScope::HandleScope(v8::Isolate*) [/usr/local/bin/node]
 5: 0xaac89c node::InternalCallbackScope::InternalCallbackScope(node::Environment*, v8::Local<v8::Object>, node::async_context const&, int) [/usr/local/bin/node]
 6: 0xaad0cf node::InternalMakeCallback(node::Environment*, v8::Local<v8::Object>, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) [/usr/local/bin/node]
 7: 0xaad2e8 node::MakeCallback(v8::Isolate*, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) [/usr/local/bin/node]
 8: 0x7f7fb00c903e PosixMQ::poll_cb(uv_poll_s*, int, int) [/app/node_modules/posix-mq/build/Release/posixmq.node]
 9: 0x165e784  [/usr/local/bin/node]
10: 0x164c998 uv_run [/usr/local/bin/node]
11: 0xaad9e5 node::SpinEventLoop(node::Environment*) [/usr/local/bin/node]
12: 0xbb1c36 node::NodeMainInstance::Run() [/usr/local/bin/node]
13: 0xb29fe2 node::LoadSnapshotDataAndRun(node::SnapshotData const**, node::InitializationResult*) [/usr/local/bin/node]
14: 0xb2d54d node::Start(int, char**) [/usr/local/bin/node]
15: 0x7f7fc9a7a083 __libc_start_main [/lib/x86_64-linux-gnu/libc.so.6]
16: 0xaaa1ce _start [/usr/local/bin/node]

Tested with Node v18 & v16.
The above stacktrace was from v18, but with v16 it's almost identical.

I did some surface research and saw some description of what would be needed to make this a 'context-aware addon' here: https://nodejs.org/api/addons.html#context-aware-addons

And additionally in the Nan repo this discussion led me to the NAN_MODULE_WORKER_ENABLED macro.

I'm going to try and pull the repo and see if I can make it work, but I'm a little bit in over my head with C++ and if significant changes are needed I'll probably not be able to do it :'D Any help / tips would be amazing!

@mikeokner
Copy link
Owner

Hi @willfrew, thanks for the thorough information, and I appreciate you trying to create a fix.

Can you share a basic code snippet that leads to that error? That's not an error I've encountered before but I can also take a look once I can reproduce.

@willfrew
Copy link
Author

Hi @mikeokner!

So I tried to create a minimal repro that throws the same error I see above, however my first attempt yielded segfaults which may still be useful but obviously not quite the same thing.

Repo here: https://github.com/willfrew/posix-mq-repro

Run ./run.sh to run in docker (set to node v18), or npm install; node ./repro.js to run locally.

@willfrew
Copy link
Author

For completeness here, the code I have that I'm seeing segfaults with is:

const { Worker, parentPort } = require('worker_threads');

if (parentPort) {
  console.log('In worker');
  const PosixMQ = require('posix-mq');
  const mq = new PosixMQ();
  try {
    mq.open({
      name: '/test-mq',
      mode: '0700',
      create: true,
      exclusive: true,
    });
    mq.unlink();
  } catch (error) {
    console.error(error);
  }
} else {
  // main
  const worker = new Worker(__filename);
}

I was just doing a bit more testing and I see that if I push the require up such that is is required in both main and worker threads (i.e.):

const { Worker, parentPort } = require('worker_threads');
const PosixMQ = require('posix-mq');

if (parentPort) {
  console.log('In worker');
...

The error I then see is instead:

events.js:377
      throw er; // Unhandled 'error' event
      ^
Error: Module did not self-register: '<snip>/posix-mq-repro/node_modules/posix-mq/build/Release/posixmq.node'.
    at Object.Module._extensions..node (internal/modules/cjs/loader.js:1144:18)
    at Module.load (internal/modules/cjs/loader.js:950:32)
    at Function.Module._load (internal/modules/cjs/loader.js:790:12)
    at Module.require (internal/modules/cjs/loader.js:974:19)
    at require (internal/modules/cjs/helpers.js:101:18)
    at Object.<anonymous> (<snip>/posix-mq-repro/node_modules/posix-mq/lib/index.js:1:15)
    at Module._compile (internal/modules/cjs/loader.js:1085:14)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1114:10)
    at Module.load (internal/modules/cjs/loader.js:950:32)
    at Function.Module._load (internal/modules/cjs/loader.js:790:12)
Emitted 'error' event on process instance at:
    at emitUnhandledRejectionOrErr (internal/event_target.js:579:11)
    at MessagePort.[nodejs.internal.kHybridDispatch] (internal/event_target.js:403:9)
    at MessagePort.exports.emitMessage (internal/per_context/messageport.js:18:26) {
  code: 'ERR_DLOPEN_FAILED'
}

Unfortunately I've been unable to reproduce the error I'm seeing in my main repo but am unable to share the code.. sorry!

@mikeokner
Copy link
Owner

Will,

I was able to reproduce the original issue.

From what I can tell, the best solution to add suppoort for worker_threads is to switch from using NAN to Napi, which is a somewhat involved process. I found these issues to be helpful in understanding the situation:

I'll look more at getting this ported, but it could take a little while.

Another approach that I validated works if you are able to use something other than worker_threads is to use the tiny-worker library which uses processes instead of threads.

const Worker = require("tiny-worker");


const worker = new Worker(function() {
  self.onmessage = async function(event) {
    const idx = event.data;
    console.log(`${idx} - In worker.  Creating posix mq...`);
    try {
      const PosixMQ = require('posix-mq');
      const mq = new PosixMQ();
        mq.open({
        name: '/test-pmq',
        mode: '0777',
        create: true,
        maxmsgs: 10,
        msgsize: 8
      });

      let wbuf = Buffer.alloc(1);
      wbuf[0] = Math.floor(Math.random() * 93) + 33;
      console.log(`${idx} - writing `+ wbuf[0] +" ('"+ String.fromCharCode(wbuf[0]) +"')...");
      mq.push(wbuf);
      console.log(`${idx} - Done writing.`);

      // sleep for a bit to validate parallel execution
      await new Promise(r => setTimeout(r, Math.random()*5*1000));

      console.log(`${idx} - Reading from queue...`);
      let rbuf = Buffer.alloc(mq.msgsize);
      let n = mq.shift(rbuf);
      console.log(`${idx} - got msg: '`+ rbuf.toString('utf8', 0, n) + `' (${n} bytes)`);

      //mq.unlink();  // don't unlink or it will remove the handle for other workers
      mq.close();
      console.log(`${idx} - Queue opened & closed successfully.  Worker done.`);

      // Send update back to main
      postMessage(`${idx} - Message - done with all operations.`);
    } catch (error) {
      console.error(`${idx} - Worker failed: `, error);
    }
  }
});

// Run 4 workers
worker.postMessage(1);
worker.postMessage(2);
worker.postMessage(3);
worker.postMessage(4);

// Handle any response from workers
worker.onmessage = function(event) {
  console.log("Message received: " + event.data);
};

Running this looks like

$ node test_tiny_worker.js
1 - In worker.  Creating posix mq...
1 - writing 55 ('7')...
1 - Done writing.
2 - In worker.  Creating posix mq...
2 - writing 106 ('j')...
2 - Done writing.
3 - In worker.  Creating posix mq...
3 - writing 64 ('@')...
3 - Done writing.
4 - In worker.  Creating posix mq...
4 - writing 69 ('E')...
4 - Done writing.
3 - Reading from queue...
3 - got msg: '7' (1 bytes)
3 - Queue opened & closed successfully.  Worker done.
Message received: 3 - Message - done with all operations.
4 - Reading from queue...
4 - got msg: 'j' (1 bytes)
4 - Queue opened & closed successfully.  Worker done.
Message received: 4 - Message - done with all operations.
1 - Reading from queue...
1 - got msg: '@' (1 bytes)
1 - Queue opened & closed successfully.  Worker done.
Message received: 1 - Message - done with all operations.
2 - Reading from queue...
2 - got msg: 'E' (1 bytes)
2 - Queue opened & closed successfully.  Worker done.
Message received: 2 - Message - done with all operations.
^C

@willfrew
Copy link
Author

Hey @mikeokner, just wanted to update that I ended up switching to another ipc transport to work around this - so no urgent need for this from me! Definitely understanding switching to n-api is non-trivial.

FWIW My use-case prevented me from using process-based concurrency (and therefore tiny-worker) because I am transferring large TypedArrays between the worker threads and then passing some relatively smaller messages out to another process with posix-mq.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants