diff --git a/node/README.md b/node/README.md index ddad19dd68de..abc1cd3fd6c9 100644 --- a/node/README.md +++ b/node/README.md @@ -124,15 +124,15 @@ $ deno run --allow-read --allow-net --allow-write node/_tools/setup.ts -n To run the tests you have set up, do the following: -```zsh -$ deno test --allow-read --allow-run node/_tools/test.ts +```shellsession +$ deno test -A --unstable node/_tools/test.ts ``` If you want to run specific Node.js test files, you can use the following command ```shellsession -$ deno test -A node/_tools/test.ts -- +$ deno test -A --unstable node/_tools/test.ts -- ``` For example, if you want to run only @@ -140,14 +140,14 @@ For example, if you want to run only use: ```shellsession -$ deno test -A node/_tools/test.ts -- test-event-emitter-check-listener-leaks.js +$ deno test -A --unstable node/_tools/test.ts -- test-event-emitter-check-listener-leaks.js ``` If you want to run all test files which contains `event-emitter` in filename, then you can use: ```shellsession -$ deno test -A node/_tools/test.ts -- event-emitter +$ deno test -A --unstable node/_tools/test.ts -- event-emitter ``` The test should be passing with the latest deno, so if the test fails, try the diff --git a/node/_events.d.ts b/node/_events.d.ts index 7d35a77602c4..886ea87f4d8e 100644 --- a/node/_events.d.ts +++ b/node/_events.d.ts @@ -289,6 +289,10 @@ interface StaticEventEmitterOptions { * @since v0.1.26 */ export class EventEmitter { + _events: any; + _eventsCount: any; + _maxListeners: any; + /** * Alias for `emitter.on(eventName, listener)`. * @since v0.1.26 diff --git a/node/_fs/_fs_access_test.ts b/node/_fs/_fs_access_test.ts index 215d7d0d8532..aca2a5f33248 100644 --- a/node/_fs/_fs_access_test.ts +++ b/node/_fs/_fs_access_test.ts @@ -8,7 +8,7 @@ Deno.test( async () => { const file = await Deno.makeTempFile(); try { - Deno.chmod(file, 0o600); + await Deno.chmod(file, 0o600); await fs.promises.access(file, fs.constants.R_OK); await fs.promises.access(file, fs.constants.W_OK); await assertRejects(async () => { @@ -40,7 +40,7 @@ Deno.test( () => { const file = Deno.makeTempFileSync(); try { - Deno.chmod(file, 0o600); + Deno.chmodSync(file, 0o600); fs.accessSync(file, fs.constants.R_OK); fs.accessSync(file, fs.constants.W_OK); assertThrows(() => { diff --git a/node/_tools/TODO.md b/node/_tools/TODO.md index f985aa81e7fc..7304da2c6bf3 100644 --- a/node/_tools/TODO.md +++ b/node/_tools/TODO.md @@ -3,7 +3,7 @@ NOTE: This file should not be manually edited. Please edit `config.json` and run `deno task node:setup` instead. -Total: 2811 +Total: 2804 - [abort/test-abort-backtrace.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-backtrace.js) - [abort/test-abort-fatal-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-fatal-error.js) @@ -363,8 +363,6 @@ Total: 2811 - [parallel/test-cluster-bind-privileged-port.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-bind-privileged-port.js) - [parallel/test-cluster-bind-twice.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-bind-twice.js) - [parallel/test-cluster-call-and-destroy.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-call-and-destroy.js) -- [parallel/test-cluster-child-index-dgram.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-child-index-dgram.js) -- [parallel/test-cluster-child-index-net.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-child-index-net.js) - [parallel/test-cluster-concurrent-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-concurrent-disconnect.js) - [parallel/test-cluster-cwd.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-cwd.js) - [parallel/test-cluster-dgram-1.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-dgram-1.js) @@ -374,11 +372,8 @@ Total: 2811 - [parallel/test-cluster-dgram-reuse.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-dgram-reuse.js) - [parallel/test-cluster-disconnect-before-exit.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-before-exit.js) - [parallel/test-cluster-disconnect-exitedAfterDisconnect-race.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-exitedAfterDisconnect-race.js) -- [parallel/test-cluster-disconnect-idle-worker.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-idle-worker.js) - [parallel/test-cluster-disconnect-leak.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-leak.js) - [parallel/test-cluster-disconnect-race.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-race.js) -- [parallel/test-cluster-disconnect-unshared-tcp.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-unshared-tcp.js) -- [parallel/test-cluster-disconnect-unshared-udp.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-unshared-udp.js) - [parallel/test-cluster-disconnect-with-no-workers.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-with-no-workers.js) - [parallel/test-cluster-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect.js) - [parallel/test-cluster-eaccess.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-eaccess.js) @@ -403,7 +398,6 @@ Total: 2811 - [parallel/test-cluster-primary-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-primary-error.js) - [parallel/test-cluster-primary-kill.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-primary-kill.js) - [parallel/test-cluster-process-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-process-disconnect.js) -- [parallel/test-cluster-rr-domain-listen.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-rr-domain-listen.js) - [parallel/test-cluster-rr-ref.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-rr-ref.js) - [parallel/test-cluster-send-deadlock.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-send-deadlock.js) - [parallel/test-cluster-send-handle-twice.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-send-handle-twice.js) @@ -430,7 +424,6 @@ Total: 2811 - [parallel/test-cluster-worker-handle-close.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-handle-close.js) - [parallel/test-cluster-worker-init.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-init.js) - [parallel/test-cluster-worker-isconnected.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-isconnected.js) -- [parallel/test-cluster-worker-isdead.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-isdead.js) - [parallel/test-cluster-worker-kill-signal.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-kill-signal.js) - [parallel/test-cluster-worker-kill.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-kill.js) - [parallel/test-cluster-worker-no-exit.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-no-exit.js) diff --git a/node/_tools/config.json b/node/_tools/config.json index f8b7f2c73de6..523f95c9e0fc 100644 --- a/node/_tools/config.json +++ b/node/_tools/config.json @@ -202,6 +202,13 @@ "test-child-process-spawnsync-validation-errors.js", "test-child-process-spawnsync.js", "test-client-request-destroy.js", + "test-cluster-child-index-dgram.js", + "test-cluster-child-index-net.js", + "test-cluster-disconnect-idle-worker.js", + "test-cluster-disconnect-unshared-tcp.js", + "test-cluster-disconnect-unshared-udp.js", + "test-cluster-rr-domain-listen.js", + "test-cluster-worker-isdead.js", "test-console-async-write-error.js", "test-console-group.js", "test-console-instance.js", diff --git a/node/_tools/require.mjs b/node/_tools/require.mjs new file mode 100644 index 000000000000..8cfd9624a5a3 --- /dev/null +++ b/node/_tools/require.mjs @@ -0,0 +1,17 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +/** + * This module is used as an entry point for test files utilizing the cluster + * module which forks processes and cannot use `.ts` files due to + * incompatibility with Deno's Node module resolution. + * See https://github.com/denoland/deno/blob/main/cli/node/mod.rs#L725 + * + * The idea is to emulate a CommonJS environment without having to modify + * the test files in any way + * + * Running with all permissions and unstable is recommended + * + * Usage: `deno run -A --unstable require.mjs my_commonjs_file.js` + */ + +import "./require.ts"; diff --git a/node/_tools/test.ts b/node/_tools/test.ts index 0e50b05e2f8e..a33381acffb5 100644 --- a/node/_tools/test.ts +++ b/node/_tools/test.ts @@ -88,7 +88,7 @@ for await (const path of testPaths) { console.log(`Error: "${path}" failed`); console.log( "You can repeat only this test with the command:", - magenta(`deno test -A node/_tools/test.ts -- ${path}`), + magenta(`deno test -A --unstable node/_tools/test.ts -- ${path}`), ); fail(decodedStderr); } diff --git a/node/_tools/test/parallel/test-cluster-child-index-dgram.js b/node/_tools/test/parallel/test-cluster-child-index-dgram.js new file mode 100644 index 000000000000..5f72f50ecd1a --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-child-index-dgram.js @@ -0,0 +1,47 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +'use strict'; +const common = require('../common'); +const Countdown = require('../common/countdown'); +if (common.isWindows) + common.skip('dgram clustering is currently not supported on Windows.'); + +const cluster = require('cluster'); +const dgram = require('dgram'); + +// Test an edge case when using `cluster` and `dgram.Socket.bind()` +// the port of `0`. +const kPort = 0; + +function child() { + const kTime = 2; + const countdown = new Countdown(kTime * 2, () => { + process.exit(0); + }); + for (let i = 0; i < kTime; i += 1) { + const socket = new dgram.Socket('udp4'); + socket.bind(kPort, common.mustCall(() => { + // `process.nextTick()` or `socket2.close()` would throw + // ERR_SOCKET_DGRAM_NOT_RUNNING + process.nextTick(() => { + socket.close(countdown.dec()); + const socket2 = new dgram.Socket('udp4'); + socket2.bind(kPort, common.mustCall(() => { + process.nextTick(() => { + socket2.close(countdown.dec()); + }); + })); + }); + })); + } +} + +if (cluster.isMaster) + cluster.fork(__filename); +else + child(); diff --git a/node/_tools/test/parallel/test-cluster-child-index-net.js b/node/_tools/test/parallel/test-cluster-child-index-net.js new file mode 100644 index 000000000000..1847eeb2b506 --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-child-index-net.js @@ -0,0 +1,38 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +'use strict'; +const common = require('../common'); +const Countdown = require('../common/countdown'); +const cluster = require('cluster'); +const net = require('net'); + +// Test an edge case when using `cluster` and `net.Server.listen()` to +// the port of `0`. +const kPort = 0; + +function child() { + const kTime = 2; + const countdown = new Countdown(kTime * 2, () => { + process.exit(0); + }); + for (let i = 0; i < kTime; i += 1) { + const server = net.createServer(); + server.listen(kPort, common.mustCall(() => { + server.close(countdown.dec()); + const server2 = net.createServer(); + server2.listen(kPort, common.mustCall(() => { + server2.close(countdown.dec()); + })); + })); + } +} + +if (cluster.isMaster) + cluster.fork(__filename); +else + child(); diff --git a/node/_tools/test/parallel/test-cluster-disconnect-idle-worker.js b/node/_tools/test/parallel/test-cluster-disconnect-idle-worker.js new file mode 100644 index 000000000000..05e4d409eb82 --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-disconnect-idle-worker.js @@ -0,0 +1,41 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); +const fork = cluster.fork; + +if (cluster.isPrimary) { + fork(); // It is intentionally called `fork` instead of + fork(); // `cluster.fork` to test that `this` is not used + cluster.disconnect(common.mustCall(() => { + assert.deepStrictEqual(Object.keys(cluster.workers), []); + })); +} diff --git a/node/_tools/test/parallel/test-cluster-disconnect-unshared-tcp.js b/node/_tools/test/parallel/test-cluster-disconnect-unshared-tcp.js new file mode 100644 index 000000000000..5feb6b778b89 --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-disconnect-unshared-tcp.js @@ -0,0 +1,51 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; +require('../common'); +process.env.NODE_CLUSTER_SCHED_POLICY = 'none'; + +const cluster = require('cluster'); +const net = require('net'); + +if (cluster.isPrimary) { + const unbound = cluster.fork().on('online', bind); + + function bind() { + cluster.fork({ BOUND: 'y' }).on('listening', disconnect); + } + + function disconnect() { + unbound.disconnect(); + unbound.on('disconnect', cluster.disconnect); + } +} else if (process.env.BOUND === 'y') { + const source = net.createServer(); + + source.listen(0); +} diff --git a/node/_tools/test/parallel/test-cluster-disconnect-unshared-udp.js b/node/_tools/test/parallel/test-cluster-disconnect-unshared-udp.js new file mode 100644 index 000000000000..3480d6e29dea --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-disconnect-unshared-udp.js @@ -0,0 +1,54 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +const common = require('../common'); + +if (common.isWindows) + common.skip('on windows, because clustered dgram is ENOTSUP'); + +const cluster = require('cluster'); +const dgram = require('dgram'); + +if (cluster.isPrimary) { + const unbound = cluster.fork().on('online', bind); + + function bind() { + cluster.fork({ BOUND: 'y' }).on('listening', disconnect); + } + + function disconnect() { + unbound.disconnect(); + unbound.on('disconnect', cluster.disconnect); + } +} else if (process.env.BOUND === 'y') { + const source = dgram.createSocket('udp4'); + + source.bind(0); +} diff --git a/node/_tools/test/parallel/test-cluster-rr-domain-listen.js b/node/_tools/test/parallel/test-cluster-rr-domain-listen.js new file mode 100644 index 000000000000..99e07fa2012f --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-rr-domain-listen.js @@ -0,0 +1,58 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; +require('../common'); +const cluster = require('cluster'); +const domain = require('domain'); + +// RR is the default for v0.11.9+ so the following line is redundant: +// cluster.schedulingPolicy = cluster.SCHED_RR; + +if (cluster.isWorker) { + const d = domain.create(); + d.run(() => {}); + + const http = require('http'); + http.Server(() => {}).listen(0, '127.0.0.1'); + +} else if (cluster.isPrimary) { + + // Kill worker when listening + cluster.on('listening', function() { + worker.kill(); + }); + + // Kill process when worker is killed + cluster.on('exit', function() { + process.exit(0); + }); + + // Create worker + const worker = cluster.fork(); +} diff --git a/node/_tools/test/parallel/test-cluster-worker-isdead.js b/node/_tools/test/parallel/test-cluster-worker-isdead.js new file mode 100644 index 000000000000..d3fd4e36700f --- /dev/null +++ b/node/_tools/test/parallel/test-cluster-worker-isdead.js @@ -0,0 +1,39 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually + +'use strict'; +require('../common'); +const cluster = require('cluster'); +const assert = require('assert'); + +if (cluster.isPrimary) { + const worker = cluster.fork(); + let workerDead = worker.isDead(); + assert.ok(!workerDead, + `isDead() returned ${workerDead}. isDead() should return ` + + 'false right after the worker has been created.'); + + worker.on('exit', function() { + workerDead = worker.isDead(); + assert.ok(workerDead, + `isDead() returned ${workerDead}. After an event has been ` + + 'emitted, isDead should return true'); + }); + + worker.on('message', function(msg) { + if (msg === 'readyToDie') { + worker.kill(); + } + }); + +} else if (cluster.isWorker) { + const workerDead = cluster.worker.isDead(); + assert.ok(!workerDead, + `isDead() returned ${workerDead}. isDead() should return ` + + 'false when called from within a worker'); + process.send('readyToDie'); +} diff --git a/node/child_process.ts b/node/child_process.ts index 58b9c0add374..1baebf5c171e 100644 --- a/node/child_process.ts +++ b/node/child_process.ts @@ -41,7 +41,11 @@ import { convertToValidSignal, kEmptyObject } from "./internal/util.mjs"; const MAX_BUFFER = 1024 * 1024; -type ForkOptions = ChildProcessOptions; +export interface ForkOptions extends ChildProcessOptions { + execPath?: string | undefined; + execArgv?: string[] | undefined; + silent?: boolean | undefined; +} /** * Spawns a new Node.js process + fork. @@ -50,9 +54,15 @@ type ForkOptions = ChildProcessOptions; * @param option * @returns */ +export function fork(modulePath: string, options?: ForkOptions): ChildProcess; +export function fork( + modulePath: string, + args?: ReadonlyArray, + options?: ForkOptions, +): ChildProcess; export function fork( modulePath: string, - _args?: string[], + _args?: ReadonlyArray | ForkOptions, _options?: ForkOptions, ) { validateString(modulePath, "modulePath"); @@ -139,9 +149,10 @@ export function fork( options.shell = false; Object.assign(options.env ??= {}, { - // deno-lint-ignore no-explicit-any - DENO_DONT_USE_INTERNAL_NODE_COMPAT_STATE: (Deno as any).core.ops - .op_npm_process_state(), + DENO_DONT_USE_INTERNAL_NODE_COMPAT_STATE: ( + // deno-lint-ignore no-explicit-any + Deno as any + ).core.ops.op_npm_process_state(), }); return spawn(options.execPath, args, options); diff --git a/node/cluster.ts b/node/cluster.ts index 444f2ef55d5c..2ad41ea488fb 100644 --- a/node/cluster.ts +++ b/node/cluster.ts @@ -1,69 +1,53 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. -import { notImplemented } from "./_utils.ts"; +import { cluster } from "./internal/cluster/cluster.ts"; +import { initRoundRobinHandle } from "./internal/cluster/round_robin_handle.ts"; +import { initSharedHandle } from "./internal/cluster/shared_handle.ts"; +import { _createServerHandle, createServer } from "./net.ts"; -/** A Worker object contains all public information and method about a worker. - * In the primary it can be obtained using cluster.workers. In a worker it can - * be obtained using cluster.worker. - */ -export class Worker { - constructor() { - notImplemented("cluster.Worker.prototype.constructor"); - } -} -/** Calls .disconnect() on each worker in cluster.workers. */ -export function disconnected() { - notImplemented("cluster.disconnected"); -} -/** Spawn a new worker process. */ -export function fork() { - notImplemented("cluster.fork"); -} -/** True if the process is a primary. This is determined by - * the process.env.NODE_UNIQUE_ID. If process.env.NODE_UNIQUE_ID is undefined, - * then isPrimary is true. */ -export const isPrimary = undefined; -/** True if the process is not a primary (it is the negation of - * cluster.isPrimary). */ -export const isWorker = undefined; -/** Deprecated alias for cluster.isPrimary. details. */ -export const isMaster = isPrimary; -/** The scheduling policy, either cluster.SCHED_RR for round-robin or - * cluster.SCHED_NONE to leave it to the operating system. This is a global - * setting and effectively frozen once either the first worker is spawned, or - * .setupPrimary() is called, whichever comes first. */ -export const schedulingPolicy = undefined; -/** The settings object */ -export const settings = undefined; -/** Deprecated alias for .setupPrimary(). */ -export function setupMaster() { - notImplemented("cluster.setupMaster"); -} -/** setupPrimary is used to change the default 'fork' behavior. Once called, - * the settings will be present in cluster.settings. */ -export function setupPrimary() { - notImplemented("cluster.setupPrimary"); -} -/** A reference to the current worker object. Not available in the primary - * process. */ -export const worker = undefined; -/** A hash that stores the active worker objects, keyed by id field. Makes it - * easy to loop through all the workers. It is only available in the primary - * process. */ -export const workers = undefined; +// Lazily initializes the cluster *Handle classes. +// This trick is necessary for avoiding circular dependencies between +// net and cluster modules. +initRoundRobinHandle(createServer); +initSharedHandle(_createServerHandle); -export default { +const { + SCHED_NONE, + SCHED_RR, Worker, - disconnected, + _events, + _eventsCount, + _maxListeners, + disconnect, fork, + isMaster, isPrimary, isWorker, + schedulingPolicy, + settings, + setupMaster, + setupPrimary, + workers, +} = cluster; + +export { + _events, + _eventsCount, + _maxListeners, + disconnect, + fork, isMaster, + isPrimary, + isWorker, + SCHED_NONE, + SCHED_RR, schedulingPolicy, settings, setupMaster, setupPrimary, - worker, + Worker, workers, }; + +export default cluster; diff --git a/node/internal/child_process.ts b/node/internal/child_process.ts index 14e8743f18d5..5db945cdbf1c 100644 --- a/node/internal/child_process.ts +++ b/node/internal/child_process.ts @@ -42,6 +42,8 @@ import { kEmptyObject } from "./util.mjs"; import { getValidatedPath } from "./fs/utils.mjs"; import process from "../process.ts"; +export const kChannelHandle = Symbol("kChannelHandle"); + type NodeStdio = "pipe" | "overlapped" | "ignore" | "inherit" | "ipc"; type DenoStdio = "inherit" | "piped" | "null"; @@ -131,11 +133,7 @@ export class ChildProcess extends EventEmitter { #process!: Deno.ChildProcess; #spawned = deferred(); - constructor( - command: string, - args?: string[], - options?: ChildProcessOptions, - ) { + constructor(command: string, args?: string[], options?: ChildProcessOptions) { super(); const { @@ -152,11 +150,7 @@ export class ChildProcess extends EventEmitter { stderr = "pipe", _channel, // TODO(kt3k): handle this correctly ] = normalizeStdioOption(stdio); - const [cmd, cmdArgs] = buildCommand( - command, - args || [], - shell, - ); + const [cmd, cmdArgs] = buildCommand(command, args || [], shell); this.spawnfile = cmd; this.spawnargs = [cmd, ...cmdArgs]; @@ -269,10 +263,21 @@ export class ChildProcess extends EventEmitter { this.#process.unref(); } + get connected() { + warnNotImplemented("ChildProcess.prototype.connected"); + + return false; + } + disconnect() { warnNotImplemented("ChildProcess.prototype.disconnect"); } + /** https://nodejs.org/api/child_process.html#subprocesssendmessage-sendhandle-options-callback */ + send() { + warnNotImplemented("ChildProcess.prototype.send"); + } + async #_waitForChildStreamsToClose() { const promises = [] as Array>; if (this.stdin && !this.stdin.destroyed) { @@ -303,6 +308,22 @@ export class ChildProcess extends EventEmitter { } } +export function setupChannel( + // deno-lint-ignore no-explicit-any + _target: any, + // deno-lint-ignore no-explicit-any + _channel: any, + // deno-lint-ignore no-explicit-any + _serializationMode: any, +) { + notImplemented("child_process.setupChannel"); +} + +// deno-lint-ignore no-explicit-any +export function getValidStdio(_stdio: any, _sync: any) { + notImplemented("child_process.getValidStdio"); +} + const supportedNodeStdioTypes: NodeStdio[] = ["pipe", "ignore", "inherit"]; function toDenoStdio( pipe: NodeStdio | number | Stream | null | undefined, @@ -357,7 +378,9 @@ export interface ChildProcessOptions { /** * Environment variables passed to the child process. */ - env?: Record; + env?: + & InstanceType + & Record; /** * This option defines child process's stdio configuration. @@ -983,6 +1006,9 @@ function toDenoArgs(args: string[]): string[] { export default { ChildProcess, + kChannelHandle, + setupChannel, + getValidStdio, stdioStringToArray, spawnSync, }; diff --git a/node/internal/cluster/child.ts b/node/internal/cluster/child.ts new file mode 100644 index 000000000000..08f289585224 --- /dev/null +++ b/node/internal/cluster/child.ts @@ -0,0 +1,337 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import assert from "../assert.mjs"; +import path from "../../path.ts"; +import EventEmitter from "../../events.ts"; +import { ownerSymbol } from "../async_hooks.ts"; +import Worker from "./worker.ts"; +import { internal, sendHelper } from "./utils.ts"; +import process from "../../process.ts"; +import type { + Cluster as ICluster, + Message, + Worker as IWorker, + WorkerClass, +} from "./types.ts"; + +const cluster: ICluster = new EventEmitter() as ICluster; +const handles = new Map(); +const indexes = new Map(); + +const noop = Function.prototype; + +(cluster.isWorker as boolean) = true; +(cluster.isMaster as boolean) = false; // Deprecated alias. Must be same as isPrimary. +(cluster.isPrimary as boolean) = false; +(cluster.worker as null) = null; +(cluster.Worker as WorkerClass) = Worker; + +cluster._setupWorker = function () { + const worker = new Worker({ + id: +process.env.NODE_UNIQUE_ID | 0, + process, + state: "online", + }); + + (cluster.worker as IWorker) = worker; + + process.once("disconnect", () => { + worker.emit("disconnect"); + + if (!worker.exitedAfterDisconnect) { + // Unexpected disconnect, primary exited, or some such nastiness, so + // worker exits immediately. + process.exit(0); + } + }); + + process.on("internalMessage", internal(worker, onmessage)); + send({ act: "online" }); + + // deno-lint-ignore no-explicit-any + function onmessage(message: Message, handle: any) { + if (message.act === "newconn") { + onconnection(message, handle); + } else if (message.act === "disconnect") { + Reflect.apply(_disconnect, worker, [true]); + } + } +}; + +// `obj` is a net#Server or a dgram#Socket object. +cluster._getServer = function ( + // deno-lint-ignore no-explicit-any + obj: any, + options: { + address?: string | null; + port?: number | null; + addressType?: string | number | null; + fd?: number | null; + flags?: number | null; + }, + // deno-lint-ignore no-explicit-any + cb: (err: number, handle: any | null) => void, +) { + let address = options.address; + + // Resolve unix socket paths to absolute paths + if ( + options.port! < 0 && + typeof address === "string" && + process.platform !== "win32" + ) { + address = path.resolve(address); + } + + const indexesKey = [ + address, + options.port, + options.addressType, + options.fd, + ].join(":"); + + let indexSet = indexes.get(indexesKey); + + if (indexSet === undefined) { + indexSet = { nextIndex: 0, set: new Set() }; + indexes.set(indexesKey, indexSet); + } + + const index = indexSet.nextIndex++; + indexSet.set.add(index); + + const message = { + act: "queryServer", + index, + data: null, + ...options, + }; + + message.address = address; + + // Set custom data on handle (i.e. tls tickets key) + // deno-lint-ignore no-explicit-any + if ((obj as any)._getServerData) { + // deno-lint-ignore no-explicit-any + message.data = (obj as any)._getServerData(); + } + + // deno-lint-ignore no-explicit-any + send(message, (reply: Record | null, handle: any) => { + // deno-lint-ignore no-explicit-any + if (typeof (obj as any)._setServerData === "function") { + // deno-lint-ignore no-explicit-any + (obj as any)._setServerData(reply!.data); + } + + if (handle) { + // Shared listen socket + shared(reply!, { handle, indexesKey, index }, cb); + } else { + // Round-robin. + rr(reply!, { indexesKey, index }, cb); + } + }); + + obj.once("listening", () => { + cluster.worker!.state = "listening"; + const address = obj.address(); + message.act = "listening"; + message.port = address?.port || options.port; + send(message); + }); +}; + +function removeIndexesKey(indexesKey: string, index: number) { + const indexSet = indexes.get(indexesKey); + + if (!indexSet) { + return; + } + + indexSet.set.delete(index); + + if (indexSet.set.size === 0) { + indexes.delete(indexesKey); + } +} + +// Shared listen socket. +function shared( + message: Message, + { + handle, + indexesKey, + index, + }: // deno-lint-ignore no-explicit-any + { handle: any; indexesKey: string; index: number }, + // deno-lint-ignore no-explicit-any + cb: (errno: number, handle: any) => void, +) { + const key = message.key; + // Monkey-patch the close() method so we can keep track of when it's + // closed. Avoids resource leaks when the handle is short-lived. + const close = handle.close; + + handle.close = function () { + send({ act: "close", key }); + handles.delete(key); + removeIndexesKey(indexesKey, index); + + return Reflect.apply(close, handle, arguments); + }; + + assert(handles.has(key) === false); + handles.set(key, handle); + cb(message.errno, handle); +} + +// Round-robin. Master distributes handles across workers. +function rr( + message: Message, + { indexesKey, index }: { indexesKey: string; index: number }, + // deno-lint-ignore no-explicit-any + cb: (errno: number, handle: any | null) => void, +) { + if (message.errno) { + return cb(message.errno, null); + } + + let key = message.key; + + function listen(_backlog: number) { + return 0; + } + + function close() { + // lib/net.js treats server._handle.close() as effectively synchronous. + // That means there is a time window between the call to close() and + // the ack by the primary process in which we can still receive handles. + // onconnection() below handles that by sending those handles back to + // the primary. + if (key === undefined) { + return; + } + + send({ act: "close", key }); + handles.delete(key); + removeIndexesKey(indexesKey, index); + key = undefined; + } + + function getsockname(out: Record): number { + if (key) { + Object.assign(out, message.sockname); + } + + return 0; + } + + // Faux handle. Mimics a TCPWrap with just enough fidelity to get away + // with it. Fools net.Server into thinking that it's backed by a real + // handle. Use a noop function for ref() and unref() because the control + // channel is going to keep the worker alive anyway. + // deno-lint-ignore no-explicit-any + const handle: any = { + close, + listen, + ref: noop as () => void, + unref: noop as () => void, + }; + + if (message.sockname) { + handle.getsockname = getsockname; // TCP handles only. + } + + assert(handles.has(key) === false); + handles.set(key, handle); + cb(0, handle); +} + +// Round-robin connection. +// deno-lint-ignore no-explicit-any +function onconnection(message: Message, handle: any) { + const key = message.key; + const server = handles.get(key); + const accepted = server !== undefined; + + send({ ack: message.seq, accepted }); + + if (accepted) { + server.onconnection(0, handle); + } +} + +function send(message: Message, cb?: unknown) { + return sendHelper(process, message, null, cb); +} + +function _disconnect(this: IWorker, primaryInitiated: boolean) { + this.exitedAfterDisconnect = true; + let waitingCount = 1; + + function checkWaitingCount() { + waitingCount--; + + if (waitingCount === 0) { + // If disconnect is worker initiated, wait for ack to be sure + // exitedAfterDisconnect is properly set in the primary, otherwise, if + // it's primary initiated there's no need to send the + // exitedAfterDisconnect message + if (primaryInitiated) { + // TODO(cmorten): remove type cast once process interface is completed. + // deno-lint-ignore no-explicit-any + (process as any).disconnect(); + } else { + send({ act: "exitedAfterDisconnect" }, () => + // TODO(cmorten): remove type cast once process interface is completed. + // deno-lint-ignore no-explicit-any + (process as any).disconnect()); + } + } + } + + handles.forEach((handle) => { + waitingCount++; + + if (handle[ownerSymbol]) { + handle[ownerSymbol].close(checkWaitingCount); + } else { + handle.close(checkWaitingCount); + } + }); + + handles.clear(); + checkWaitingCount(); +} + +// Extend generic Worker with methods specific to worker processes. +Worker.prototype.disconnect = function () { + if (this.state !== "disconnecting" && this.state !== "destroying") { + this.state = "disconnecting"; + Reflect.apply(_disconnect, this, []); + } + + return this; +}; + +Worker.prototype.destroy = function () { + if (this.state === "destroying") { + return; + } + + this.exitedAfterDisconnect = true; + + if (!this.isConnected()) { + process.exit(0); + } else { + this.state = "destroying"; + // TODO(cmorten): remove type cast once process interface is completed. + // deno-lint-ignore no-explicit-any + send({ act: "exitedAfterDisconnect" }, () => (process as any).disconnect()); + process.once("disconnect", () => process.exit(0)); + } +}; + +export default cluster; diff --git a/node/internal/cluster/cluster.ts b/node/internal/cluster/cluster.ts new file mode 100644 index 000000000000..469ca2c25fca --- /dev/null +++ b/node/internal/cluster/cluster.ts @@ -0,0 +1,24 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import process from "../../process.ts"; +import childCluster from "./child.ts"; +import primaryCluster from "./primary.ts"; + +export const cluster = "NODE_UNIQUE_ID" in process.env + ? childCluster + : primaryCluster; + +initializeClusterIPC(); + +// TODO: migrate to process pre-execution module if/when ported. +// See https://github.com/nodejs/node/blob/main/lib/internal/process/pre_execution.js#L507. +function initializeClusterIPC() { + if (process.env.NODE_UNIQUE_ID) { + cluster._setupWorker!(); + // Make sure it's not accidentally inherited by child processes. + delete process.env.NODE_UNIQUE_ID; + } +} + +export default cluster; diff --git a/node/internal/cluster/primary.ts b/node/internal/cluster/primary.ts new file mode 100644 index 000000000000..9a2e33515b2d --- /dev/null +++ b/node/internal/cluster/primary.ts @@ -0,0 +1,438 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import assert from "../assert.mjs"; +import { fork } from "../../child_process.ts"; +import path from "../../path.ts"; +import EventEmitter from "../../events.ts"; +import RoundRobinHandle from "./round_robin_handle.ts"; +import SharedHandle from "./shared_handle.ts"; +import Worker from "./worker.ts"; +import { internal, sendHelper } from "./utils.ts"; +import { validatePort } from "../validators.mjs"; +import process from "../../process.ts"; +import { ChildProcess } from "../child_process.ts"; +import { notImplemented } from "../../_utils.ts"; +import { ObjectAssign } from "../primordials.mjs"; +import type { + Cluster as ICluster, + ClusterSettings, + Message, + Worker as IWorker, + WorkerClass, +} from "./types.ts"; + +const cluster: ICluster = new EventEmitter() as ICluster; +const intercom = new EventEmitter(); +const SCHED_NONE = 1; +const SCHED_RR = 2; +const minPort = 1024; +const maxPort = 65535; + +const handles = new Map(); +(cluster.isWorker as boolean) = false; +(cluster.isMaster as boolean) = true; +(cluster.isPrimary as boolean) = true; +(cluster.Worker as WorkerClass) = Worker; +(cluster.workers as Record) = {}; +(cluster.settings as ClusterSettings) = {}; +(cluster.SCHED_NONE as number) = SCHED_NONE; // Leave it to the operating system. +(cluster.SCHED_RR as number) = SCHED_RR; // Primary distributes connections. + +let ids = 0; +let debugPortOffset = 1; +let initialized = false; +const envSchedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; +let schedulingPolicy: number; + +if (envSchedulingPolicy === "rr") { + schedulingPolicy = SCHED_RR; +} else if (envSchedulingPolicy === "none") { + schedulingPolicy = SCHED_NONE; +} else if (process.platform === "win32") { + // Round-robin doesn't perform well on + // Windows due to the way IOCP is wired up. + schedulingPolicy = SCHED_NONE; +} else { + schedulingPolicy = SCHED_RR; +} + +cluster.schedulingPolicy = schedulingPolicy; + +cluster.setupPrimary = function (options?: ClusterSettings) { + const settings = { + args: process.argv.slice(2), + // TODO: remove extension replacement if/when have a better solution for + // resolving TypeScript files through Deno's Node module resolution. + // See https://github.com/denoland/deno/blob/main/cli/node/mod.rs#L725. + exec: process.argv[1].replace(/\.ts$/, ".mjs"), + execArgv: process.execArgv, + silent: false, + ...cluster.settings, + ...options, + }; + + // Tell V8 to write profile data for each process to a separate file. + // Without --logfile=v8-%p.log, everything ends up in a single, unusable + // file. (Unusable because what V8 logs are memory addresses and each + // process has its own memory mappings.) + if ( + settings.execArgv.some((s: string) => s.startsWith("--prof")) && + !settings.execArgv.some((s: string) => s.startsWith("--logfile=")) + ) { + settings.execArgv = [...settings.execArgv, "--logfile=v8-%p.log"]; + } + + (cluster.settings as ClusterSettings) = settings; + + if (initialized === true) { + return process.nextTick(setupSettingsNT, settings); + } + + initialized = true; + schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. + + assert( + schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, + `Bad cluster.schedulingPolicy: ${schedulingPolicy}`, + ); + + process.nextTick(setupSettingsNT, settings); + + process.on("internalMessage", (message: Message) => { + if (message.cmd !== "NODE_DEBUG_ENABLED") { + return; + } + + notImplemented("cluster.Cluster.prototype.setupPrimary debugProcess"); + }); +}; + +// Deprecated alias must be same as setupPrimary +cluster.setupMaster = cluster.setupPrimary; + +function setupSettingsNT(settings: ClusterSettings) { + cluster.emit("setup", settings); +} + +function createWorkerProcess(id: number, env?: Record) { + const workerEnv = ObjectAssign({}, process.env, env, { + NODE_UNIQUE_ID: `${id}`, + }) as + & InstanceType + & Record; + const execArgv = [...(cluster.settings.execArgv as string[])]; + const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/; + const nodeOptions = process.env.NODE_OPTIONS || ""; + + if ( + execArgv.some((arg) => debugArgRegex.test(arg)) || + debugArgRegex.test(nodeOptions) + ) { + let inspectPort; + + if ("inspectPort" in cluster.settings) { + if (typeof cluster.settings.inspectPort === "function") { + inspectPort = cluster.settings.inspectPort(); + } else { + inspectPort = cluster.settings.inspectPort; + } + + validatePort(inspectPort); + } else { + inspectPort = (process as unknown as { debugPort: number }).debugPort + + debugPortOffset; + + if (inspectPort > maxPort) { + inspectPort = inspectPort - maxPort + minPort - 1; + } + + debugPortOffset++; + } + + execArgv.push(`--inspect-port=${inspectPort}`); + } + + return fork(cluster.settings.exec!, cluster.settings.args, { + cwd: cluster.settings.cwd, + env: workerEnv, + serialization: cluster.settings.serialization, + silent: cluster.settings.silent, + windowsHide: cluster.settings.windowsHide, + execArgv: execArgv, + stdio: cluster.settings.stdio, + gid: cluster.settings.gid, + uid: cluster.settings.uid, + }); +} + +function removeWorker(worker: IWorker) { + assert(worker); + delete cluster.workers![worker.id]; + + if (Object.keys(cluster.workers!).length === 0) { + assert(handles.size === 0, "Resource leak detected."); + intercom.emit("disconnect"); + } +} + +function removeHandlesForWorker(worker: IWorker) { + assert(worker); + + handles.forEach((handle, key) => { + if (handle.remove(worker)) handles.delete(key); + }); +} + +cluster.fork = function (env) { + cluster.setupPrimary(); + const id = ++ids; + const workerProcess = createWorkerProcess(id, env); + const worker = new Worker({ + id: id, + process: workerProcess, + }); + + worker.on( + "message", + // deno-lint-ignore no-explicit-any + function (this: IWorker, message: Message, handle: any) { + cluster.emit("message", this, message, handle); + }, + ); + + worker.process.once("exit", (exitCode: number, signalCode: number) => { + /* + * Remove the worker from the workers list only + * if it has disconnected, otherwise we might + * still want to access it. + */ + if (!worker.isConnected()) { + removeHandlesForWorker(worker); + removeWorker(worker); + } + + worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; + worker.state = "dead"; + worker.emit("exit", exitCode, signalCode); + cluster.emit("exit", worker, exitCode, signalCode); + }); + + worker.process.once("disconnect", () => { + /* + * Now is a good time to remove the handles + * associated with this worker because it is + * not connected to the primary anymore. + */ + removeHandlesForWorker(worker); + + /* + * Remove the worker from the workers list only + * if its process has exited. Otherwise, we might + * still want to access it. + */ + if (worker.isDead()) { + removeWorker(worker); + } + + worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; + worker.state = "disconnected"; + worker.emit("disconnect"); + cluster.emit("disconnect", worker); + }); + + worker.process.on("internalMessage", internal(worker, onmessage)); + process.nextTick(emitForkNT, worker); + cluster.workers![worker.id] = worker; + + return worker; +}; + +function emitForkNT(worker: IWorker) { + cluster.emit("fork", worker); +} + +cluster.disconnect = function (cb) { + const workers = Object.keys(cluster.workers!); + + if (workers.length === 0) { + process.nextTick(() => intercom.emit("disconnect")); + } else { + for (const worker of Object.values(cluster.workers!)) { + if (worker.isConnected()) { + worker.disconnect(); + } + } + } + + if (typeof cb === "function") { + intercom.once("disconnect", cb); + } +}; + +const methodMessageMapping = { + close, + exitedAfterDisconnect, + listening, + online, + queryServer, +}; + +// deno-lint-ignore no-explicit-any +function onmessage(this: IWorker, message: Message, _handle: any) { + const fn = + methodMessageMapping[message.act as keyof typeof methodMessageMapping]; + + if (typeof fn === "function") { + fn(this, message); + } +} + +function online(worker: IWorker) { + worker.state = "online"; + worker.emit("online"); + cluster.emit("online", worker); +} + +function exitedAfterDisconnect(worker: IWorker, message: Message) { + worker.exitedAfterDisconnect = true; + send(worker, { ack: message.seq }); +} + +function queryServer(worker: IWorker, message: Message) { + // Stop processing if worker already disconnecting + if (worker.exitedAfterDisconnect) { + return; + } + + const key = `${message.address}:${message.port}:${message.addressType}:` + + `${message.fd}:${message.index}`; + + let handle = handles.get(key); + + if (handle === undefined) { + let address = message.address; + + // Find shortest path for unix sockets because of the ~100 byte limit + if ( + message.port! < 0 && + typeof address === "string" && + process.platform !== "win32" + ) { + address = path.relative(process.cwd(), address); + + if (message.address!.length < address.length) { + address = message.address; + } + } + + // UDP is exempt from round-robin connection balancing for what should + // be obvious reasons: it's connectionless. There is nothing to send to + // the workers except raw datagrams and that's pointless. + if ( + schedulingPolicy !== SCHED_RR || + message.addressType === "udp4" || + message.addressType === "udp6" + ) { + handle = new SharedHandle(key, address!, message); + } else { + handle = new RoundRobinHandle(key, address!, message); + } + + handles.set(key, handle); + } + + if (!handle.data) { + handle.data = message.data; + } + + // Set custom server data + handle.add( + worker, + ( + errno: number, + reply: Record | null, + // deno-lint-ignore no-explicit-any + handle: any, + ) => { + const { data } = handles.get(key); + + if (errno) { + handles.delete(key); // Gives other workers a chance to retry. + } + + send( + worker, + { + errno, + key, + ack: message.seq, + data, + ...reply, + }, + handle, + ); + }, + ); +} + +function listening(worker: IWorker, message: Message) { + const info = { + addressType: message.addressType, + address: message.address, + port: message.port, + fd: message.fd, + }; + + worker.state = "listening"; + worker.emit("listening", info); + cluster.emit("listening", worker, info); +} + +// Server in worker is closing, remove from list. The handle may have been +// removed by a prior call to removeHandlesForWorker() so guard against that. +function close(worker: IWorker, message: Message) { + const key = message.key; + const handle = handles.get(key); + + if (handle && handle.remove(worker)) { + handles.delete(key); + } +} + +function send( + worker: IWorker, + message: Message, + // deno-lint-ignore no-explicit-any + handle?: any, + cb?: unknown, +) { + return sendHelper(worker.process, message, handle, cb); +} + +// Extend generic Worker with methods specific to the primary process. +Worker.prototype.disconnect = function (): IWorker { + this.exitedAfterDisconnect = true; + send(this, { act: "disconnect" }); + removeHandlesForWorker(this); + removeWorker(this); + + return this; +}; + +Worker.prototype.destroy = function (signo?: string): void { + const proc = this.process; + + signo = signo || "SIGTERM"; + + if (this.isConnected()) { + this.once("disconnect", () => (proc as ChildProcess).kill(signo)); + this.disconnect(); + + return; + } + + (proc as ChildProcess).kill(signo); +}; + +export default cluster; diff --git a/node/internal/cluster/round_robin_handle.ts b/node/internal/cluster/round_robin_handle.ts new file mode 100644 index 000000000000..b6262fdc9733 --- /dev/null +++ b/node/internal/cluster/round_robin_handle.ts @@ -0,0 +1,177 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import assert from "../assert.mjs"; +import { sendHelper } from "./utils.ts"; +import { constants } from "../../internal_binding/tcp_wrap.ts"; +import { append, init, isEmpty, peek, remove } from "../linkedlist.mjs"; +import type { Message, Worker } from "./types.ts"; + +// deno-lint-ignore no-var no-explicit-any +var RoundRobinHandle: any; + +// Lazily initializes the actual RoundRobinHandle class. +// This trick is necessary for avoiding circular dependencies between +// net and cluster modules. +// deno-lint-ignore no-explicit-any +export function initRoundRobinHandle(createServer: any) { + if (RoundRobinHandle) { + return; + } + + RoundRobinHandle = class RoundRobinHandle { + key: string; + all: Map; + free: Map; + // deno-lint-ignore no-explicit-any + handle: any = null; + // deno-lint-ignore no-explicit-any + handles: any; + // deno-lint-ignore no-explicit-any + server: any; + + constructor( + key: string, + address: string, + { port, fd, flags, backlog }: Message, + ) { + this.key = key; + this.all = new Map(); + this.free = new Map(); + this.handles = init(Object.create(null)); + this.handle = null; + this.server = createServer(assert.fail); + + if (fd! >= 0) { + this.server.listen({ fd, backlog }); + } else if (port! >= 0) { + this.server.listen({ + port, + host: address, + // Currently, net module only supports `ipv6Only` option in `flags`. + ipv6Only: Boolean(flags! & constants.UV_TCP_IPV6ONLY), + backlog, + }); + } else { + this.server.listen(address, backlog); // UNIX socket path. + } + + this.server.once("listening", () => { + this.handle = this.server!._handle; + // deno-lint-ignore no-explicit-any + this.handle!.onconnection = (err: number, handle?: any) => + this.distribute(err, handle); + this.server!._handle = null; + this.server = null; + }); + } + + add( + worker: Worker, + send: ( + errno: number | null, + reply: Record | null, + // deno-lint-ignore no-explicit-any + handle: any, + ) => void, + ) { + assert(this.all.has(worker.id) === false); + this.all.set(worker.id, worker); + + const done = () => { + if (this.handle.getsockname) { + const out = {}; + this.handle.getsockname(out); + send(null, { sockname: out }, null); + } else { + send(null, null, null); // UNIX socket. + } + + this.handoff(worker); // In case there are connections pending. + }; + + if (this.server === null) { + return done(); + } + + // Still busy binding. + this.server.once("listening", done); + // deno-lint-ignore no-explicit-any + this.server.once("error", (err: any) => { + send(err.errno, null, null); + }); + } + + remove(worker: Worker) { + const existed = this.all.delete(worker.id); + + if (!existed) { + return false; + } + + this.free.delete(worker.id); + + if (this.all.size !== 0) { + return false; + } + + while (!isEmpty(this.handles)) { + const handle = peek(this.handles); + handle.close(); + remove(handle); + } + + this.handle!.close(); + this.handle = null; + + return true; + } + + // deno-lint-ignore no-explicit-any + distribute(_err: number, handle?: any) { + append(this.handles, handle); + const [workerEntry] = this.free; // this.free is a Map + + if (Array.isArray(workerEntry)) { + const { 0: workerId, 1: worker } = workerEntry; + this.free.delete(workerId); + this.handoff(worker); + } + } + + handoff(worker: Worker) { + if (!this.all.has(worker.id)) { + return; // Worker is closing (or has closed) the server. + } + + const handle = peek(this.handles); + + if (handle === null) { + this.free.set(worker.id, worker); // Add to ready queue again. + + return; + } + + remove(handle); + + const message = { act: "newconn", key: this.key }; + + sendHelper( + worker.process, + message, + handle, + (reply: Record) => { + if (reply.accepted) { + handle.close(); + } else { + this.distribute(0, handle); // Worker is shutting down. Send to another. + } + + this.handoff(worker); + }, + ); + } + }; +} + +export { RoundRobinHandle as default }; diff --git a/node/internal/cluster/shared_handle.ts b/node/internal/cluster/shared_handle.ts new file mode 100644 index 000000000000..146e238006d4 --- /dev/null +++ b/node/internal/cluster/shared_handle.ts @@ -0,0 +1,91 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import assert from "../assert.mjs"; +import { _createSocketHandle } from "../dgram.ts"; +import type { Message, Worker } from "./types.ts"; + +// deno-lint-ignore no-var no-explicit-any +var SharedHandle: any; + +// Lazily initializes the actual SharedHandle class. +// This trick is necessary for avoiding circular dependencies between +// net and cluster modules. +// deno-lint-ignore no-explicit-any +export function initSharedHandle(_createServerHandle: any) { + if (SharedHandle) { + return; + } + + SharedHandle = class SharedHandle { + key: string; + workers: Map; + // deno-lint-ignore no-explicit-any + handle: any = null; + errno = 0; + + constructor( + key: string, + address: string, + { port, addressType, fd, flags }: Message, + ) { + this.key = key; + this.workers = new Map(); + this.handle = null; + this.errno = 0; + + let rval; + + if (addressType === "udp4" || addressType === "udp6") { + rval = _createSocketHandle(address, port!, addressType, fd!, flags!); + } else { + rval = _createServerHandle( + address, + port!, + addressType as number, + fd, + flags, + ); + } + + if (typeof rval === "number") { + this.errno = rval; + } else { + this.handle = rval; + } + } + + add( + worker: Worker, + send: ( + errno: number, + reply: Record | null, + // deno-lint-ignore no-explicit-any + handle: any, + ) => void, + ) { + assert(!this.workers.has(worker.id)); + this.workers.set(worker.id, worker); + send(this.errno, null, this.handle!); + } + + remove(worker: Worker) { + if (!this.workers.has(worker.id)) { + return false; + } + + this.workers.delete(worker.id); + + if (this.workers.size !== 0) { + return false; + } + + this.handle!.close(); + this.handle = null; + + return true; + } + }; +} + +export { SharedHandle as default }; diff --git a/node/internal/cluster/types.ts b/node/internal/cluster/types.ts new file mode 100644 index 000000000000..5af118d9db0d --- /dev/null +++ b/node/internal/cluster/types.ts @@ -0,0 +1,539 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import EventEmitter from "../../events.ts"; +import { ChildProcess } from "../child_process.ts"; +import { Process } from "../../process.ts"; +import type { ForkOptions } from "../../child_process.ts"; + +export interface Message { + // deno-lint-ignore no-explicit-any + [key: string]: any; +} + +export type Serializable = + | string + | Record + | number + | boolean + | bigint; + +export interface MessageOptions { + keepOpen?: boolean | undefined; +} + +export interface ClusterSettings extends ForkOptions { + exec?: string; + args?: string[]; + inspectPort?: number | (() => number); +} + +export interface Address { + address: string; + port: number; + addressType: number | "udp4" | "udp6"; // 4, 6, -1, "udp4", "udp6" +} + +export interface WorkerOptions { + id?: number; + process?: ChildProcess | Process; + state?: string; +} + +export interface WorkerClass extends Function { + new (options?: WorkerOptions | null): Worker; +} + +export interface Worker extends EventEmitter { + state: string; + + /** + * Each new worker is given its own unique id, this id is stored in the`id`. + * + * While a worker is alive, this is the key that indexes it in`cluster.workers`. + */ + id: number; + + /** + * All workers are created using `child_process.fork()`, the returned object + * from this function is stored as `.process`. In a worker, the global `process`is stored. + * + * See: `Child Process module`. + * + * Workers will call `process.exit(0)` if the `'disconnect'` event occurs + * on `process` and `.exitedAfterDisconnect` is not `true`. This protects against + * accidental disconnection. + */ + process: ChildProcess | Process; + + /** + * Send a message to a worker or primary, optionally with a handle. + * + * In the primary this sends a message to a specific worker. It is identical to `ChildProcess.send()`. + * + * In a worker this sends a message to the primary. It is identical to`process.send()`. + * + * This example will echo back all messages from the primary: + * + * ```js + * if (cluster.isPrimary) { + * const worker = cluster.fork(); + * worker.send('hi there'); + * + * } else if (cluster.isWorker) { + * process.on('message', (msg) => { + * process.send(msg); + * }); + * } + * ``` + * @param options The `options` argument, if present, is an object used to parameterize the sending of certain types of handles. `options` supports the following properties: + */ + send( + message: Serializable, + callback?: (error: Error | null) => void, + ): boolean; + send( + message: Serializable, + // deno-lint-ignore no-explicit-any + sendHandle: any, + callback?: (error: Error | null) => void, + ): boolean; + send( + message: Serializable, + // deno-lint-ignore no-explicit-any + sendHandle: any, + options?: MessageOptions, + callback?: (error: Error | null) => void, + ): boolean; + + /** + * This function will kill the worker. In the primary, it does this + * by disconnecting the `worker.process`, and once disconnected, killing + * with `signal`. In the worker, it does it by disconnecting the channel, + * and then exiting with code `0`. + * + * Because `kill()` attempts to gracefully disconnect the worker process, it is + * susceptible to waiting indefinitely for the disconnect to complete. For example, + * if the worker enters an infinite loop, a graceful disconnect will never occur. + * If the graceful disconnect behavior is not needed, use `worker.process.kill()`. + * + * Causes `.exitedAfterDisconnect` to be set. + * + * This method is aliased as `worker.destroy()` for backward compatibility. + * + * In a worker, `process.kill()` exists, but it is not this function; + * it is `kill()`. + * @param [signal='SIGTERM'] Name of the kill signal to send to the worker process. + */ + kill(signal?: string): void; + + destroy(signal?: string): void; + + /** + * In a worker, this function will close all servers, wait for the `'close'` event + * on those servers, and then disconnect the IPC channel. + * + * In the primary, an internal message is sent to the worker causing it to call`.disconnect()` on itself. + * + * Causes `.exitedAfterDisconnect` to be set. + * + * After a server is closed, it will no longer accept new connections, + * but connections may be accepted by any other listening worker. Existing + * connections will be allowed to close as usual. When no more connections exist, + * see `server.close()`, the IPC channel to the worker will close allowing it + * to die gracefully. + * + * The above applies _only_ to server connections, client connections are not + * automatically closed by workers, and disconnect does not wait for them to close + * before exiting. + * + * In a worker, `process.disconnect` exists, but it is not this function; + * it is `disconnect()`. + * + * Because long living server connections may block workers from disconnecting, it + * may be useful to send a message, so application specific actions may be taken to + * close them. It also may be useful to implement a timeout, killing a worker if + * the `'disconnect'` event has not been emitted after some time. + * + * ```js + * if (cluster.isPrimary) { + * const worker = cluster.fork(); + * let timeout; + * + * worker.on('listening', (address) => { + * worker.send('shutdown'); + * worker.disconnect(); + * timeout = setTimeout(() => { + * worker.kill(); + * }, 2000); + * }); + * + * worker.on('disconnect', () => { + * clearTimeout(timeout); + * }); + * + * } else if (cluster.isWorker) { + * const net = require('net'); + * const server = net.createServer((socket) => { + * // Connections never end + * }); + * + * server.listen(8000); + * + * process.on('message', (msg) => { + * if (msg === 'shutdown') { + * // Initiate graceful close of any connections to server + * } + * }); + * } + * ``` + * @return A reference to `worker`. + */ + disconnect(): void; + + /** + * This function returns `true` if the worker is connected to its primary via its + * IPC channel, `false` otherwise. A worker is connected to its primary after it + * has been created. It is disconnected after the `'disconnect'` event is emitted. + */ + isConnected(): boolean; + + /** + * This function returns `true` if the worker's process has terminated (either + * because of exiting or being signaled). Otherwise, it returns `false`. + * + * ```js + * import cluster from 'cluster'; + * import http from 'http'; + * import { cpus } from 'os'; + * import process from 'process'; + * + * const numCPUs = cpus().length; + * + * if (cluster.isPrimary) { + * console.log(`Primary ${process.pid} is running`); + * + * // Fork workers. + * for (let i = 0; i < numCPUs; i++) { + * cluster.fork(); + * } + * + * cluster.on('fork', (worker) => { + * console.log('worker is dead:', worker.isDead()); + * }); + * + * cluster.on('exit', (worker, code, signal) => { + * console.log('worker is dead:', worker.isDead()); + * }); + * } else { + * // Workers can share any TCP connection. In this case, it is an HTTP server. + * http.createServer((req, res) => { + * res.writeHead(200); + * res.end(`Current process\n ${process.pid}`); + * process.kill(process.pid); + * }).listen(8000); + * } + * ``` + */ + isDead(): boolean; + + /** + * This property is `true` if the worker exited due to `.kill()` or`.disconnect()`. If the worker exited any other way, it is `false`. If the + * worker has not exited, it is `undefined`. + * + * The boolean `worker.exitedAfterDisconnect` allows distinguishing between + * voluntary and accidental exit, the primary may choose not to respawn a worker + * based on this value. + * + * ```js + * cluster.on('exit', (worker, code, signal) => { + * if (worker.exitedAfterDisconnect === true) { + * console.log('Oh, it was just voluntary – no need to worry'); + * } + * }); + * + * // kill worker + * worker.kill(); + * ``` + */ + exitedAfterDisconnect?: boolean; + + /** + * events.EventEmitter + * 1. disconnect + * 2. error + * 3. exit + * 4. listening + * 5. message + * 6. online + */ + addListener(event: string, listener: (...args: unknown[]) => void): this; + addListener(event: "disconnect", listener: () => void): this; + addListener(event: "error", listener: (error: Error) => void): this; + addListener( + event: "exit", + listener: (code: number, signal: string) => void, + ): this; + addListener(event: "listening", listener: (address: Address) => void): this; + addListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + addListener(event: "online", listener: () => void): this; + + emit(event: string | symbol, ...args: unknown[]): boolean; + emit(event: "disconnect"): boolean; + emit(event: "error", error: Error): boolean; + emit(event: "exit", code: number, signal: string): boolean; + emit(event: "listening", address: Address): boolean; + // deno-lint-ignore no-explicit-any + emit(event: "message", message: unknown, handle: any): boolean; + emit(event: "online"): boolean; + + on(event: string, listener: (...args: unknown[]) => void): this; + on(event: "disconnect", listener: () => void): this; + on(event: "error", listener: (error: Error) => void): this; + on(event: "exit", listener: (code: number, signal: string) => void): this; + on(event: "listening", listener: (address: Address) => void): this; + // deno-lint-ignore no-explicit-any + on(event: "message", listener: (message: unknown, handle: any) => void): this; // the handle is a Socket or Server object, or undefined. + on(event: "online", listener: () => void): this; + + once(event: string, listener: (...args: unknown[]) => void): this; + once(event: "disconnect", listener: () => void): this; + once(event: "error", listener: (error: Error) => void): this; + once(event: "exit", listener: (code: number, signal: string) => void): this; + once(event: "listening", listener: (address: Address) => void): this; + once( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + once(event: "online", listener: () => void): this; + + prependListener(event: string, listener: (...args: unknown[]) => void): this; + prependListener(event: "disconnect", listener: () => void): this; + prependListener(event: "error", listener: (error: Error) => void): this; + prependListener( + event: "exit", + listener: (code: number, signal: string) => void, + ): this; + prependListener( + event: "listening", + listener: (address: Address) => void, + ): this; + prependListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + prependListener(event: "online", listener: () => void): this; + + prependOnceListener( + event: string, + listener: (...args: unknown[]) => void, + ): this; + prependOnceListener(event: "disconnect", listener: () => void): this; + prependOnceListener(event: "error", listener: (error: Error) => void): this; + prependOnceListener( + event: "exit", + listener: (code: number, signal: string) => void, + ): this; + prependOnceListener( + event: "listening", + listener: (address: Address) => void, + ): this; + prependOnceListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + prependOnceListener(event: "online", listener: () => void): this; +} + +export interface Cluster extends EventEmitter { + readonly isPrimary: boolean; + /** @deprecated use isPrimary. */ + readonly isMaster: boolean; + readonly isWorker: boolean; + readonly settings: ClusterSettings; + readonly worker?: Worker | null; + readonly Worker?: WorkerClass; + readonly workers?: Record; + readonly SCHED_NONE: number; + readonly SCHED_RR: number; + + schedulingPolicy: number; + + _setupWorker?: () => void; + + _getServer?: ( + // deno-lint-ignore no-explicit-any + obj: any, + options: { + address?: string | null; + port?: number | null; + addressType?: string | number | null; + fd?: number | null; + flags?: number | null; + }, + // deno-lint-ignore no-explicit-any + cb: (err: number, handle: any) => void, + ) => void; + + disconnect(callback?: () => void): void; + + fork(env?: Record): Worker; + + /** @deprecated - use setupPrimary. */ + setupMaster(settings?: ClusterSettings): void; + + /** + * `setupPrimary` is used to change the default 'fork' behavior. Once called, the settings will be present in cluster.settings. + */ + setupPrimary(settings?: ClusterSettings): void; + + /** + * events.EventEmitter + * 1. disconnect + * 2. exit + * 3. fork + * 4. listening + * 5. message + * 6. online + * 7. setup + */ + addListener(event: string, listener: (...args: unknown[]) => void): this; + addListener(event: "disconnect", listener: (worker: Worker) => void): this; + addListener( + event: "exit", + listener: (worker: Worker, code: number, signal: string) => void, + ): this; + addListener(event: "fork", listener: (worker: Worker) => void): this; + addListener( + event: "listening", + listener: (worker: Worker, address: Address) => void, + ): this; + addListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (worker: Worker, message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + addListener(event: "online", listener: (worker: Worker) => void): this; + addListener( + event: "setup", + listener: (settings: ClusterSettings) => void, + ): this; + + emit(event: string | symbol, ...args: unknown[]): boolean; + emit(event: "disconnect", worker: Worker): boolean; + emit(event: "exit", worker: Worker, code: number, signal: string): boolean; + emit(event: "fork", worker: Worker): boolean; + emit(event: "listening", worker: Worker, address: Address): boolean; + emit( + event: "message", + worker: Worker, + message: unknown, + // deno-lint-ignore no-explicit-any + handle: any, + ): boolean; + emit(event: "online", worker: Worker): boolean; + emit(event: "setup", settings: ClusterSettings): boolean; + + on(event: string, listener: (...args: unknown[]) => void): this; + on(event: "disconnect", listener: (worker: Worker) => void): this; + on( + event: "exit", + listener: (worker: Worker, code: number, signal: string) => void, + ): this; + on(event: "fork", listener: (worker: Worker) => void): this; + on( + event: "listening", + listener: (worker: Worker, address: Address) => void, + ): this; + on( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (worker: Worker, message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + on(event: "online", listener: (worker: Worker) => void): this; + on(event: "setup", listener: (settings: ClusterSettings) => void): this; + + once(event: string, listener: (...args: unknown[]) => void): this; + once(event: "disconnect", listener: (worker: Worker) => void): this; + once( + event: "exit", + listener: (worker: Worker, code: number, signal: string) => void, + ): this; + once(event: "fork", listener: (worker: Worker) => void): this; + once( + event: "listening", + listener: (worker: Worker, address: Address) => void, + ): this; + once( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (worker: Worker, message: unknown, handle: any) => void, + ): this; // the handle is a Socket or Server object, or undefined. + once(event: "online", listener: (worker: Worker) => void): this; + once(event: "setup", listener: (settings: ClusterSettings) => void): this; + + prependListener(event: string, listener: (...args: unknown[]) => void): this; + prependListener( + event: "disconnect", + listener: (worker: Worker) => void, + ): this; + prependListener( + event: "exit", + listener: (worker: Worker, code: number, signal: string) => void, + ): this; + prependListener(event: "fork", listener: (worker: Worker) => void): this; + prependListener( + event: "listening", + listener: (worker: Worker, address: Address) => void, + ): this; + // the handle is a Socket or Server object, or undefined. + prependListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (worker: Worker, message: unknown, handle?: any) => void, + ): this; + prependListener(event: "online", listener: (worker: Worker) => void): this; + prependListener( + event: "setup", + listener: (settings: ClusterSettings) => void, + ): this; + + prependOnceListener( + event: string, + listener: (...args: unknown[]) => void, + ): this; + prependOnceListener( + event: "disconnect", + listener: (worker: Worker) => void, + ): this; + prependOnceListener( + event: "exit", + listener: (worker: Worker, code: number, signal: string) => void, + ): this; + prependOnceListener(event: "fork", listener: (worker: Worker) => void): this; + prependOnceListener( + event: "listening", + listener: (worker: Worker, address: Address) => void, + ): this; + // the handle is a Socket or Server object, or undefined. + prependOnceListener( + event: "message", + // deno-lint-ignore no-explicit-any + listener: (worker: Worker, message: unknown, handle: any) => void, + ): this; + prependOnceListener( + event: "online", + listener: (worker: Worker) => void, + ): this; + prependOnceListener( + event: "setup", + listener: (settings: ClusterSettings) => void, + ): this; +} diff --git a/node/internal/cluster/utils.ts b/node/internal/cluster/utils.ts new file mode 100644 index 000000000000..2477812eb48f --- /dev/null +++ b/node/internal/cluster/utils.ts @@ -0,0 +1,72 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import { ChildProcess } from "../child_process.ts"; +import { Process } from "../../process.ts"; +import type { Message, Worker } from "./types.ts"; + +const callbacks = new Map(); +let seq = 0; + +export function sendHelper( + proc: ChildProcess | Process, + message: Message, + // deno-lint-ignore no-explicit-any + handle?: any, + cb?: unknown, +) { + // TODO(cmorten): remove type cast once ChildProcess implements `connected` + // property. + // deno-lint-ignore no-explicit-any + if (!(proc as any).connected) { + return false; + } + + // Mark message as internal. See INTERNAL_PREFIX + // in lib/internal/child_process.js + message = { cmd: "NODE_CLUSTER", ...message, seq }; + + if (typeof cb === "function") { + callbacks.set(seq, cb); + } + + seq += 1; + + // TODO(cmorten): remove type cast once ChildProcess implements `send` + // method. + // deno-lint-ignore no-explicit-any + return (proc as any).send(message, handle); +} + +// Returns an internalMessage listener that hands off normal messages +// to the callback but intercepts and redirects ACK messages. +export function internal( + worker: Worker, + // deno-lint-ignore no-explicit-any + cb: (message: Message, handle: any) => void, +) { + // deno-lint-ignore no-explicit-any + return function onInternalMessage(message: Message, _handle: any) { + if (message.cmd !== "NODE_CLUSTER") { + return; + } + + let fn = cb; + + if (message.ack !== undefined) { + const callback = callbacks.get(message.ack); + + if (callback !== undefined) { + fn = callback; + callbacks.delete(message.ack); + } + } + + Reflect.apply(fn, worker, arguments); + }; +} + +export default { + sendHelper, + internal, +}; diff --git a/node/internal/cluster/worker.ts b/node/internal/cluster/worker.ts new file mode 100644 index 000000000000..66f460745a67 --- /dev/null +++ b/node/internal/cluster/worker.ts @@ -0,0 +1,76 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +import { EventEmitter } from "../../events.ts"; +import { ChildProcess } from "../child_process.ts"; +import { Process } from "../../process.ts"; +import type { Worker as IWorker, WorkerOptions } from "./types.ts"; + +// Common Worker implementation shared between the cluster primary and workers. +export class Worker extends EventEmitter implements IWorker { + state: string; + id: number; + process!: ChildProcess | Process; + exitedAfterDisconnect?: boolean; + + destroy(_signal?: string): void {} + disconnect(): void {} + + constructor(options?: WorkerOptions | null) { + super(); + + if (options === null || typeof options !== "object") { + options = {}; + } + + this.exitedAfterDisconnect = undefined; + + this.state = options.state || "none"; + this.id = options.id || 0; + + if (options.process) { + this.process = options.process; + this.process.on( + "error", + (code, signal) => { + console.log("Worker process error", code); + this.emit("error", code, signal); + }, + ); + this.process.on( + "message", + (message, handle) => { + console.log("Worker process message", message); + this.emit("message", message, handle); + }, + ); + } + } + + kill(): void { + return Reflect.apply(this.destroy, this, arguments); + } + + send(): boolean { + // TODO(cmorten): remove type cast once ChildProcess implements `send` + // method. + // deno-lint-ignore no-explicit-any + return Reflect.apply((this.process as any).send, this.process, arguments); + } + + isDead(): boolean { + return ( + this.process.exitCode != null || + (this.process as ChildProcess).signalCode != null + ); + } + + isConnected(): boolean { + // TODO(cmorten): remove type cast once ChildProcess implements `connected` + // property. + // deno-lint-ignore no-explicit-any + return (this.process as any).connected; + } +} + +export default Worker; diff --git a/node/internal/linkedlist.mjs b/node/internal/linkedlist.mjs new file mode 100644 index 000000000000..74e68f53323e --- /dev/null +++ b/node/internal/linkedlist.mjs @@ -0,0 +1,60 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license. + +export function init(list) { + list._idleNext = list; + list._idlePrev = list; + + return list; +} + +// Show the most idle item. +export function peek(list) { + if (list._idlePrev === list) { + return null; + } + + return list._idlePrev; +} + +// Remove an item from its list. +export function remove(item) { + if (item._idleNext) { + item._idleNext._idlePrev = item._idlePrev; + } + + if (item._idlePrev) { + item._idlePrev._idleNext = item._idleNext; + } + + item._idleNext = null; + item._idlePrev = null; +} + +// Remove an item from its list and place at the end. +export function append(list, item) { + if (item._idleNext || item._idlePrev) { + remove(item); + } + + // Items are linked with _idleNext -> (older) and _idlePrev -> (newer). + // Note: This linkage (next being older) may seem counter-intuitive at first. + item._idleNext = list._idleNext; + item._idlePrev = list; + + // The list _idleNext points to tail (newest) and _idlePrev to head (oldest). + list._idleNext._idlePrev = item; + list._idleNext = item; +} + +export function isEmpty(list) { + return list._idleNext === list; +} + +export default { + init, + peek, + remove, + append, + isEmpty, +}; diff --git a/node/module_all.ts b/node/module_all.ts index 4c96cdd24027..941a801bd345 100644 --- a/node/module_all.ts +++ b/node/module_all.ts @@ -28,6 +28,12 @@ import http2 from "./http2.ts"; import https from "./https.ts"; import inspector from "./inspector.ts"; import internalCp from "./internal/child_process.ts"; +import internalClusterChild from "./internal/cluster/child.ts"; +import internalClusterPrimary from "./internal/cluster/primary.ts"; +import internalClusterRoundRobinHandle from "./internal/cluster/round_robin_handle.ts"; +import internalClusterSharedHandle from "./internal/cluster/shared_handle.ts"; +import internalClusterUtils from "./internal/cluster/utils.ts"; +import internalClusterWorker from "./internal/cluster/worker.ts"; import internalCryptoCertificate from "./internal/crypto/certificate.ts"; import internalCryptoCipher from "./internal/crypto/cipher.ts"; import internalCryptoDiffiehellman from "./internal/crypto/diffiehellman.ts"; @@ -118,6 +124,12 @@ export default { https, inspector, "internal/child_process": internalCp, + "internal/cluster/child": internalClusterChild, + "internal/cluster/primary": internalClusterPrimary, + "internal/cluster/round_robin_handle": internalClusterRoundRobinHandle, + "internal/cluster/shared_handle": internalClusterSharedHandle, + "internal/cluster/utils": internalClusterUtils, + "internal/cluster/worker": internalClusterWorker, "internal/crypto/certificate": internalCryptoCertificate, "internal/crypto/cipher": internalCryptoCipher, "internal/crypto/diffiehellman": internalCryptoDiffiehellman, diff --git a/node/module_test.ts b/node/module_test.ts index df9d323926db..aa4be94c3363 100644 --- a/node/module_test.ts +++ b/node/module_test.ts @@ -144,6 +144,7 @@ Deno.test("requireErrorInEval", async function () { "run", "--unstable", "--allow-read", + "--allow-env", "./_module/cjs/test_cjs_import.js", ], cwd, diff --git a/node/net.ts b/node/net.ts index 4e7898e9ab53..4c4a88e60959 100644 --- a/node/net.ts +++ b/node/net.ts @@ -96,8 +96,17 @@ import { debuglog } from "./internal/util/debuglog.ts"; import type { DuplexOptions } from "./_stream.d.ts"; import type { BufferEncoding } from "./_global.d.ts"; import type { Abortable } from "./_events.d.ts"; +import { initRoundRobinHandle } from "./internal/cluster/round_robin_handle.ts"; +import { initSharedHandle } from "./internal/cluster/shared_handle.ts"; +import { cluster } from "./internal/cluster/cluster.ts"; import { channel } from "./diagnostics_channel.ts"; +// Lazily initializes the cluster *Handle classes. +// This trick is necessary for avoiding circular dependencies between +// net and cluster modules. +initRoundRobinHandle(createServer); +initSharedHandle(_createServerHandle); + let debug = debuglog("net", (fn) => { debug = fn; }); @@ -110,7 +119,7 @@ const kBytesWritten = Symbol("kBytesWritten"); const DEFAULT_IPV4_ADDR = "0.0.0.0"; const DEFAULT_IPV6_ADDR = "::"; -type Handle = TCP | Pipe; +export type Handle = TCP | Pipe; interface HandleOptions { pauseOnCreate?: boolean; @@ -180,7 +189,7 @@ interface IpcNetConnectOptions extends IpcSocketConnectOptions, SocketOptions { type NetConnectOptions = TcpNetConnectOptions | IpcNetConnectOptions; -interface AddressInfo { +export interface AddressInfo { address: string; family?: string; port: number; @@ -1660,16 +1669,7 @@ function _listenInCluster( ) { exclusive = !!exclusive; - // TODO(cmorten): here we deviate somewhat from the Node implementation which - // makes use of the https://nodejs.org/api/cluster.html module to run servers - // across a "cluster" of Node processes to take advantage of multi-core - // systems. - // - // Though Deno has has a Worker capability from which we could simulate this, - // for now we assert that we are _always_ on the primary process. - const isPrimary = true; - - if (isPrimary || exclusive) { + if (cluster.isPrimary || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method @@ -1677,6 +1677,35 @@ function _listenInCluster( return; } + + const serverQuery = { + address, + port, + addressType, + fd, + flags, + backlog, + }; + + // Get the primary's server handle, and listen on it + // deno-lint-ignore no-explicit-any + cluster._getServer!(server, serverQuery, listenOnPrimaryHandle as any); + + function listenOnPrimaryHandle(err: number, handle: TCP) { + err = _checkBindError(err, port!, handle!); + + if (err) { + const ex = exceptionWithHostPort(err, "bind", address!, port!); + + return server.emit("error", ex); + } + + // Reuse primary's server handle + server._handle = handle; + // _listen2 sets up the listened handle, it is still named like this + // to avoid breaking code that wraps this method + server._listen2(address, port, addressType, backlog, fd, flags); + } } function _lookupAndListen( @@ -2538,11 +2567,16 @@ export class Server extends EventEmitter { * @param connectionListener Automatically set as a listener for the `"connection"` event. * @return A `net.Server`. */ +export function createServer(connectionListener?: ConnectionListener): Server; export function createServer( options?: ServerOptions, connectionListener?: ConnectionListener, +): Server; +export function createServer( + options?: ServerOptions | ConnectionListener, + connectionListener?: ConnectionListener, ): Server { - return new Server(options, connectionListener); + return new Server(options as ServerOptions, connectionListener); } export { isIP, isIPv4, isIPv6 }; diff --git a/node/process.ts b/node/process.ts index 021bd987b465..6b0c405010df 100644 --- a/node/process.ts +++ b/node/process.ts @@ -340,7 +340,7 @@ function uncaughtExceptionHandler(err: any, origin: string) { let execPath: string | null = null; -class Process extends EventEmitter { +export class Process extends EventEmitter { constructor() { super(); @@ -421,6 +421,18 @@ class Process extends EventEmitter { _exiting = _exiting; + /** https://nodejs.org/api/process.html#processconnected */ + get connected() { + warnNotImplemented("process.connected"); + + return false; + } + + /** https://nodejs.org/api/process.html#processsendmessage-sendhandle-options-callback */ + send() { + warnNotImplemented("process.send"); + } + /** https://nodejs.org/api/process.html#processexitcode_1 */ exitCode: undefined | number = undefined;