diff --git a/Cargo.lock b/Cargo.lock index f2044731a4f7be..953b8bce51ab04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1508,6 +1508,7 @@ dependencies = [ "libz-sys", "md-5", "md4", + "nix 0.26.2", "num-bigint", "num-bigint-dig", "num-integer", @@ -1518,6 +1519,7 @@ dependencies = [ "p384", "path-clean", "pbkdf2", + "pin-project-lite", "rand", "regex", "reqwest", @@ -1529,6 +1531,7 @@ dependencies = [ "sha-1", "sha2", "signature", + "simd-json", "tokio", "typenum", "url", @@ -2405,6 +2408,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fly-accept-encoding" version = "0.2.0" @@ -2617,8 +2629,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2767,12 +2781,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "halfbrown" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5681137554ddff44396e5f149892c769d45301dd9aa19c51602a89ee214cb0ec" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.14.3" @@ -3292,6 +3325,70 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.150" @@ -4437,6 +4534,26 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ref-cast" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acde58d073e9c79da00f2b5b84eed919c8326832648a5b109b3fce1bb1175280" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "regex" version = "1.10.2" @@ -5036,6 +5153,22 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simd-json" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a3720326b20bf5b95b72dbbd133caae7e0dcf71eae8f6e6656e71a7e5c9aaa" +dependencies = [ + "getrandom", + "halfbrown", + "lexical-core", + "ref-cast", + "serde", + "serde_json", + "simdutf8", + "value-trait", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -6388,6 +6521,18 @@ dependencies = [ "which", ] +[[package]] +name = "value-trait" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97" +dependencies = [ + "float-cmp", + "halfbrown", + "itoa", + "ryu", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/cli/args/mod.rs b/cli/args/mod.rs index c65745e297a641..17711371a4b6bd 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -939,6 +939,17 @@ impl CliOptions { .map(Some) } + pub fn node_ipc_fd(&self) -> Option { + let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); + if let Some(node_channel_fd) = maybe_node_channel_fd { + // Remove so that child processes don't inherit this environment variable. + std::env::remove_var("DENO_CHANNEL_FD"); + node_channel_fd.parse::().ok() + } else { + None + } + } + pub fn resolve_main_module(&self) -> Result { match &self.flags.subcommand { DenoSubcommand::Bundle(bundle_flags) => { diff --git a/cli/factory.rs b/cli/factory.rs index 5db09767cf8298..0b21f6ecad2318 100644 --- a/cli/factory.rs +++ b/cli/factory.rs @@ -672,6 +672,7 @@ impl CliFactory { self.maybe_lockfile().clone(), self.feature_checker().clone(), self.create_cli_main_worker_options()?, + self.options.node_ipc_fd(), )) } diff --git a/cli/standalone/mod.rs b/cli/standalone/mod.rs index a6bf1286240c1e..63f014118cfe38 100644 --- a/cli/standalone/mod.rs +++ b/cli/standalone/mod.rs @@ -530,6 +530,7 @@ pub async fn run( unstable: metadata.unstable, maybe_root_package_json_deps: package_json_deps_provider.deps().cloned(), }, + None, ); v8_set_flags(construct_v8_flags(&[], &metadata.v8_flags, vec![])); diff --git a/cli/tests/node_compat/config.jsonc b/cli/tests/node_compat/config.jsonc index 943be97598350f..a29d75a1915b0e 100644 --- a/cli/tests/node_compat/config.jsonc +++ b/cli/tests/node_compat/config.jsonc @@ -38,6 +38,10 @@ "test-child-process-execfile.js", "test-child-process-execsync-maxbuf.js", "test-child-process-exit-code.js", + // TODO(littledivy): windows ipc streams not yet implemented + "test-child-process-fork-ref.js", + "test-child-process-fork-ref2.js", + "test-child-process-ipc-next-tick.js", "test-child-process-ipc.js", "test-child-process-spawnsync-env.js", "test-child-process-stdio-inherit.js", @@ -109,9 +113,7 @@ "test-zlib-zero-windowBits.js" ], "pummel": [], - "sequential": [ - "test-child-process-exit.js" - ] + "sequential": ["test-child-process-exit.js"] }, "tests": { "common": [ @@ -138,11 +140,7 @@ "print-chars.js", "x.txt" ], - "fixtures/keys": [ - "agent1-cert.pem", - "agent1-key.pem", - "ca1-cert.pem" - ], + "fixtures/keys": ["agent1-cert.pem", "agent1-key.pem", "ca1-cert.pem"], "internet": [ "test-dns-any.js", "test-dns-idna2008.js", @@ -695,9 +693,7 @@ "test-tty-stdout-end.js" ], "pummel": [], - "sequential": [ - "test-child-process-exit.js" - ] + "sequential": ["test-child-process-exit.js"] }, "windowsIgnore": { "parallel": [ diff --git a/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js new file mode 100644 index 00000000000000..37c186af8307ad --- /dev/null +++ b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js @@ -0,0 +1,72 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +require('../common'); +const assert = require('assert'); +const fork = require('child_process').fork; + +if (process.argv[2] === 'child') { + process.send('1'); + + // Check that child don't instantly die + setTimeout(function() { + process.send('2'); + }, 200); + + process.on('disconnect', function() { + process.stdout.write('3'); + }); + +} else { + const child = fork(__filename, ['child'], { silent: true }); + + const ipc = []; + let stdout = ''; + + child.on('message', function(msg) { + ipc.push(msg); + + if (msg === '2') child.disconnect(); + }); + + child.stdout.on('data', function(chunk) { + stdout += chunk; + }); + + child.once('exit', function() { + assert.deepStrictEqual(ipc, ['1', '2']); + assert.strictEqual(stdout, '3'); + }); +} diff --git a/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js new file mode 100644 index 00000000000000..da59d9378f3f6d --- /dev/null +++ b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js @@ -0,0 +1,63 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +const { + mustCall, + mustNotCall, + platformTimeout, +} = require('../common'); +const fork = require('child_process').fork; +const debug = require('util').debuglog('test'); + +if (process.argv[2] === 'child') { + debug('child -> call disconnect'); + process.disconnect(); + + setTimeout(() => { + debug('child -> will this keep it alive?'); + process.on('message', mustNotCall()); + }, platformTimeout(400)); + +} else { + const child = fork(__filename, ['child']); + + child.on('disconnect', mustCall(() => { + debug('parent -> disconnect'); + })); + + child.once('exit', mustCall(() => { + debug('parent -> exit'); + })); +} diff --git a/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js b/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js new file mode 100644 index 00000000000000..d255a0a64018d7 --- /dev/null +++ b/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js @@ -0,0 +1,52 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +const common = require('../common'); +const assert = require('assert'); +const cp = require('child_process'); +const NUM_MESSAGES = 10; +const values = []; + +for (let i = 0; i < NUM_MESSAGES; ++i) { + values[i] = i; +} + +if (process.argv[2] === 'child') { + const received = values.map(() => { return false; }); + + process.on('uncaughtException', common.mustCall((err) => { + received[err] = true; + const done = received.every((element) => { return element === true; }); + + if (done) + process.disconnect(); + }, NUM_MESSAGES)); + + process.on('message', (msg) => { + // If messages are handled synchronously, throwing should break the IPC + // message processing. + throw msg; + }); + + process.send('ready'); +} else { + const child = cp.fork(__filename, ['child']); + + child.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'ready'); + values.forEach((value) => { + child.send(value); + }); + })); +} diff --git a/cli/worker.rs b/cli/worker.rs index ce9c057014b28f..22e534e1d98b40 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -124,6 +124,7 @@ struct SharedWorkerState { maybe_inspector_server: Option>, maybe_lockfile: Option>>, feature_checker: Arc, + node_ipc: Option, } impl SharedWorkerState { @@ -415,6 +416,7 @@ impl CliMainWorkerFactory { maybe_lockfile: Option>>, feature_checker: Arc, options: CliMainWorkerOptions, + node_ipc: Option, ) -> Self { Self { shared: Arc::new(SharedWorkerState { @@ -435,6 +437,7 @@ impl CliMainWorkerFactory { maybe_inspector_server, maybe_lockfile, feature_checker, + node_ipc, }), } } @@ -596,6 +599,7 @@ impl CliMainWorkerFactory { .options .maybe_binary_npm_command_name .clone(), + node_ipc_fd: shared.node_ipc, }, extensions: custom_extensions, startup_snapshot: crate::js::deno_isolate_init(), @@ -793,6 +797,7 @@ fn create_web_worker_callback( .options .maybe_binary_npm_command_name .clone(), + node_ipc_fd: None, }, extensions: vec![], startup_snapshot: crate::js::deno_isolate_init(), diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 1393c82c093d59..e5f9841457cac6 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -44,6 +44,7 @@ libc.workspace = true libz-sys.workspace = true md-5 = "0.10.5" md4 = "0.10.2" +nix.workspace = true num-bigint.workspace = true num-bigint-dig = "0.8.2" num-integer = "0.1.45" @@ -54,6 +55,7 @@ p256.workspace = true p384.workspace = true path-clean = "=0.1.0" pbkdf2 = "0.12.1" +pin-project-lite = "0.2.13" rand.workspace = true regex.workspace = true reqwest.workspace = true @@ -65,6 +67,7 @@ serde = "1.0.149" sha-1 = "0.10.0" sha2.workspace = true signature.workspace = true +simd-json = "0.13.4" tokio.workspace = true typenum = "1.15.0" url.workspace = true diff --git a/ext/node/benchmarks/child_process_ipc.mjs b/ext/node/benchmarks/child_process_ipc.mjs new file mode 100644 index 00000000000000..0486972dc3b7b2 --- /dev/null +++ b/ext/node/benchmarks/child_process_ipc.mjs @@ -0,0 +1,64 @@ +import { fork } from "node:child_process"; +import process from "node:process"; +import { setImmediate } from "node:timers"; + +if (process.env.CHILD) { + const len = +process.env.CHILD; + const msg = ".".repeat(len); + const send = () => { + while (process.send(msg)); + // Wait: backlog of unsent messages exceeds threshold + setImmediate(send); + }; + send(); +} else { + function main(dur, len) { + const p = new Promise((resolve) => { + const start = performance.now(); + + const options = { + "stdio": ["inherit", "inherit", "inherit", "ipc"], + "env": { "CHILD": len.toString() }, + }; + const path = new URL("child_process_ipc.mjs", import.meta.url).pathname; + const child = fork( + path, + options, + ); + + let bytes = 0; + let total = 0; + child.on("message", (msg) => { + bytes += msg.length; + total += 1; + }); + + setTimeout(() => { + child.kill(); + const end = performance.now(); + const mb = bytes / 1024 / 1024; + const sec = (end - start) / 1000; + const mbps = mb / sec; + console.log(`${len} bytes: ${mbps.toFixed(2)} MB/s`); + console.log(`${total} messages`); + resolve(); + }, dur * 1000); + }); + return p; + } + + const len = [ + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 65536 << 4, + 65536 << 6 - 1, + ]; + + for (const l of len) { + await main(5, l); + } +} diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 56f4b0ee075dfd..77f01b3d38a48f 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -312,6 +312,9 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, + ops::ipc::op_node_ipc_pipe, + ops::ipc::op_node_ipc_write, + ops::ipc::op_node_ipc_read, ], esm_entry_point = "ext:deno_node/02_init.js", esm = [ diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs new file mode 100644 index 00000000000000..d1aeeb40c1f9cf --- /dev/null +++ b/ext/node/ops/ipc.rs @@ -0,0 +1,504 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +#[cfg(unix)] +pub use unix::*; + +#[cfg(windows)] +pub use windows::*; + +#[cfg(unix)] +mod unix { + use std::cell::RefCell; + use std::future::Future; + use std::io; + use std::mem; + use std::os::fd::FromRawFd; + use std::os::fd::RawFd; + use std::pin::Pin; + use std::rc::Rc; + use std::task::Context; + use std::task::Poll; + + use deno_core::error::bad_resource_id; + use deno_core::error::AnyError; + use deno_core::op2; + use deno_core::serde_json; + use deno_core::AsyncRefCell; + use deno_core::CancelFuture; + use deno_core::CancelHandle; + use deno_core::OpState; + use deno_core::RcRef; + use deno_core::ResourceId; + use pin_project_lite::pin_project; + use tokio::io::AsyncBufRead; + use tokio::io::AsyncWriteExt; + use tokio::io::BufReader; + use tokio::net::unix::OwnedReadHalf; + use tokio::net::unix::OwnedWriteHalf; + use tokio::net::UnixStream; + + #[op2(fast)] + #[smi] + pub fn op_node_ipc_pipe( + state: &mut OpState, + #[smi] fd: i32, + ) -> Result { + Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) + } + + #[op2(async)] + pub async fn op_node_ipc_write( + state: Rc>, + #[smi] rid: ResourceId, + #[serde] value: serde_json::Value, + ) -> Result<(), AnyError> { + let stream = state + .borrow() + .resource_table + .get::(rid) + .map_err(|_| bad_resource_id())?; + stream.write_msg(value).await?; + Ok(()) + } + + #[op2(async)] + #[serde] + pub async fn op_node_ipc_read( + state: Rc>, + #[smi] rid: ResourceId, + ) -> Result { + let stream = state + .borrow() + .resource_table + .get::(rid) + .map_err(|_| bad_resource_id())?; + + let cancel = stream.cancel.clone(); + let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; + let msgs = stream.read_msg().or_cancel(cancel).await??; + Ok(msgs) + } + + struct IpcJsonStreamResource { + read_half: AsyncRefCell, + write_half: AsyncRefCell, + cancel: Rc, + } + + impl deno_core::Resource for IpcJsonStreamResource { + fn close(self: Rc) { + self.cancel.cancel(); + } + } + + impl IpcJsonStreamResource { + fn new(stream: RawFd) -> Result { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read_half, write_half) = unix_stream.into_split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + }) + } + + #[cfg(test)] + fn from_unix_stream(stream: UnixStream) -> Self { + let (read_half, write_half) = stream.into_split(); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + + async fn write_msg( + self: Rc, + msg: serde_json::Value, + ) -> Result<(), AnyError> { + let mut write_half = + RcRef::map(self, |r| &r.write_half).borrow_mut().await; + // Perf note: We do not benefit from writev here because + // we are always allocating a buffer for serialization anyways. + let mut buf = Vec::new(); + serde_json::to_writer(&mut buf, &msg)?; + buf.push(b'\n'); + write_half.write_all(&buf).await?; + Ok(()) + } + } + + #[inline] + fn memchr(needle: u8, haystack: &[u8]) -> Option { + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + // Safety: haystack of valid length. neon_memchr can handle unaligned + // data. + return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; + + #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] + return haystack.iter().position(|&b| b == needle); + } + + // Initial capacity of the buffered reader and the JSON backing buffer. + // + // This is a tradeoff between memory usage and performance on large messages. + // + // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. + const INITIAL_CAPACITY: usize = 1024 * 64; + + // JSON serialization stream over IPC pipe. + // + // `\n` is used as a delimiter between messages. + struct IpcJsonStream { + pipe: BufReader, + buffer: Vec, + } + + impl IpcJsonStream { + fn new(pipe: OwnedReadHalf) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + + async fn read_msg(&mut self) -> Result { + let mut json = None; + let nread = + read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + if nread == 0 { + // EOF. + return Ok(serde_json::Value::Null); + } + + let json = match json { + Some(v) => v, + None => { + // Took more than a single read and some buffering. + simd_json::from_slice(&mut self.buffer[..nread])? + } + }; + + // Safety: Same as `Vec::clear` but without the `drop_in_place` for + // each element (nop for u8). Capacity remains the same. + unsafe { + self.buffer.set_len(0); + } + + Ok(json) + } + } + + pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgInner<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec, + json: &'a mut Option, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + } + } + + fn read_msg_inner<'a, R>( + reader: &'a mut R, + buf: &'a mut Vec, + json: &'a mut Option, + ) -> ReadMsgInner<'a, R> + where + R: AsyncBufRead + ?Sized + Unpin, + { + ReadMsgInner { + reader, + buf, + json, + read: 0, + } + } + + fn read_msg_internal( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec, + json: &mut Option, + read: &mut usize, + ) -> Poll> { + loop { + let (done, used) = { + let available = match reader.as_mut().poll_fill_buf(cx) { + std::task::Poll::Ready(t) => t?, + std::task::Poll::Pending => return std::task::Poll::Pending, + }; + + if let Some(i) = memchr(b'\n', available) { + if *read == 0 { + // Fast path: parse and put into the json slot directly. + // + // Safety: It is ok to overwrite the contents because + // we don't need to copy it into the buffer and the length will be reset. + let available = unsafe { + std::slice::from_raw_parts_mut( + available.as_ptr() as *mut u8, + available.len(), + ) + }; + json.replace( + simd_json::from_slice(&mut available[..i + 1]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } else { + // This is not the first read, so we have to copy the data + // to make it contiguous. + buf.extend_from_slice(&available[..=i]); + } + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } + } + + impl Future for ReadMsgInner<'_, R> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) + } + } + + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + mod neon { + use std::arch::aarch64::*; + + pub unsafe fn neon_memchr( + str: &[u8], + c: u8, + length: usize, + ) -> Option { + let end = str.as_ptr().wrapping_add(length); + + // Alignment handling + let mut ptr = str.as_ptr(); + while ptr < end && (ptr as usize) & 0xF != 0 { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + let search_char = vdupq_n_u8(c); + + while ptr.wrapping_add(16) <= end { + let chunk = vld1q_u8(ptr); + let comparison = vceqq_u8(chunk, search_char); + + // Check first 64 bits + let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); + if result0 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + result0.trailing_zeros() as usize / 8, + ); + } + + // Check second 64 bits + let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); + if result1 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + 8 + + result1.trailing_zeros() as usize / 8, + ); + } + + ptr = ptr.wrapping_add(16); + } + + // Handle remaining unaligned characters + while ptr < end { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + None + } + } + + #[cfg(test)] + mod tests { + use super::IpcJsonStreamResource; + use deno_core::serde_json; + use deno_core::serde_json::json; + use deno_core::RcRef; + use std::rc::Rc; + + #[tokio::test] + async fn bench_ipc() -> Result<(), Box> { + // A simple round trip benchmark for quick dev feedback. + // + // Only ran when the env var is set. + if std::env::var_os("BENCH_IPC_DENO").is_none() { + return Ok(()); + } + + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + let size = 1024 * 1024; + + let stri = "x".repeat(size); + let data = format!("\"{}\"\n", stri); + for _ in 0..100 { + fd2.write_all(data.as_bytes()).await?; + } + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + let start = std::time::Instant::now(); + let mut bytes = 0; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + loop { + let msgs = ipc.read_msg().await?; + if msgs == serde_json::Value::Null { + break; + } + bytes += msgs.as_str().unwrap().len(); + if start.elapsed().as_secs() > 5 { + break; + } + } + let elapsed = start.elapsed(); + let mb = bytes as f64 / 1024.0 / 1024.0; + println!("{} mb/s", mb / elapsed.as_secs_f64()); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json() -> Result<(), Box> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n"); + fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + /* Similar to how ops would use the resource */ + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + ipc.clone().write_msg(json!("hello")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("world")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_multi() -> Result<(), Box> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + ipc.clone().write_msg(json!("hello")).await?; + ipc.clone().write_msg(json!("world")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("foo")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_invalid() -> Result<(), Box> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let _err = ipc.read_msg().await.unwrap_err(); + + child.await??; + + Ok(()) + } + + #[test] + fn memchr() { + let str = b"hello world"; + assert_eq!(super::memchr(b'h', str), Some(0)); + assert_eq!(super::memchr(b'w', str), Some(6)); + assert_eq!(super::memchr(b'd', str), Some(10)); + assert_eq!(super::memchr(b'x', str), None); + + let empty = b""; + assert_eq!(super::memchr(b'\n', empty), None); + } + } +} + +#[cfg(windows)] +mod windows { + use deno_core::error::AnyError; + use deno_core::op2; + + #[op2(fast)] + pub fn op_node_ipc_pipe() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_write() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_read() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } +} diff --git a/ext/node/ops/mod.rs b/ext/node/ops/mod.rs index ec4324da3b33a9..277e340df1e820 100644 --- a/ext/node/ops/mod.rs +++ b/ext/node/ops/mod.rs @@ -5,6 +5,7 @@ pub mod fs; pub mod http; pub mod http2; pub mod idna; +pub mod ipc; pub mod os; pub mod require; pub mod util; diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js index e3061c95d6e5f5..e5a0279a57a13d 100644 --- a/ext/node/polyfills/02_init.js +++ b/ext/node/polyfills/02_init.js @@ -7,15 +7,12 @@ const requireImpl = internals.requireImpl; import { nodeGlobals } from "ext:deno_node/00_globals.js"; import "node:module"; -globalThis.nodeBootstrap = function (usesLocalNodeModulesDir, argv0) { - initialize(usesLocalNodeModulesDir, argv0); -}; - let initialized = false; function initialize( usesLocalNodeModulesDir, argv0, + ipcFd, ) { if (initialized) { throw Error("Node runtime already initialized"); @@ -41,6 +38,7 @@ function initialize( // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); internals.__initWorkerThreads(); + internals.__setupChildProcessIpcChannel(ipcFd); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; } @@ -52,6 +50,8 @@ function loadCjsModule(moduleName, isMain, inspectBrk) { requireImpl.Module._load(moduleName, null, { main: isMain }); } +globalThis.nodeBootstrap = initialize; + internals.node = { initialize, loadCjsModule, diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts index 94a1084473cf94..c7d007f4609916 100644 --- a/ext/node/polyfills/child_process.ts +++ b/ext/node/polyfills/child_process.ts @@ -10,6 +10,7 @@ import { ChildProcess, ChildProcessOptions, normalizeSpawnArguments, + setupChannel, type SpawnOptions, spawnSync as _spawnSync, type SpawnSyncOptions, @@ -821,6 +822,14 @@ export function execFileSync( return ret.stdout as string | Buffer; } +function setupChildProcessIpcChannel(fd: number) { + if (typeof fd != "number" || fd < 0) return; + setupChannel(process, fd); +} + +globalThis.__bootstrap.internals.__setupChildProcessIpcChannel = + setupChildProcessIpcChannel; + export default { fork, spawn, diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 04773a8b70756c..b9bf133969f2b0 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +const core = globalThis.__bootstrap.core; +const ops = core.ops; + export function mapValues( record: Readonly>, transformer: (value: T) => O, @@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter { signal, windowsVerbatimArguments = false, } = options || {}; + const normalizedStdio = normalizeStdioOption(stdio); const [ stdin = "pipe", stdout = "pipe", stderr = "pipe", _channel, // TODO(kt3k): handle this correctly - ] = normalizeStdioOption(stdio); + ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, args || [], @@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter { this.spawnfile = cmd; this.spawnargs = [cmd, ...cmdArgs]; + const ipc = normalizedStdio.indexOf("ipc"); + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter { stdout: toDenoStdio(stdout), stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, + ipc, // internal }).spawn(); this.pid = this.#process.pid; @@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter { } } + if (typeof this.#process._pipeFd == "number") { + setupChannel(this, this.#process._pipeFd); + } + (async () => { const status = await this.#process.status; this.exitCode = status.code; @@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } +export function setupChannel(target, channel) { + const ipc = ops.op_node_ipc_pipe(channel); + + async function readLoop() { + try { + while (true) { + if (!target.connected || target.killed) { + return; + } + const msg = await core.opAsync("op_node_ipc_read", ipc); + if (msg == null) { + // Channel closed. + target.disconnect(); + return; + } + + process.nextTick(handleMessage, msg); + } + } catch (err) { + if ( + err instanceof Deno.errors.Interrupted || + err instanceof Deno.errors.BadResource + ) { + return; + } + } + } + + function handleMessage(msg) { + target.emit("message", msg); + } + + target.send = function (message, handle, options, callback) { + if (typeof handle === "function") { + callback = handle; + handle = undefined; + options = undefined; + } else if (typeof options === "function") { + callback = options; + options = undefined; + } else if (options !== undefined) { + validateObject(options, "options"); + } + + options = { swallowErrors: false, ...options }; + + if (message === undefined) { + throw new TypeError("ERR_MISSING_ARGS", "message"); + } + + if (handle !== undefined) { + notImplemented("ChildProcess.send with handle"); + } + + core.opAsync("op_node_ipc_write", ipc, message) + .then(() => { + if (callback) { + process.nextTick(callback, null); + } + }); + }; + + target.connected = true; + + target.disconnect = function () { + if (!this.connected) { + this.emit("error", new Error("IPC channel is already disconnected")); + return; + } + + this.connected = false; + process.nextTick(() => { + core.close(ipc); + target.emit("disconnect"); + }); + }; + + // Start reading messages from the channel. + readLoop(); +} + export default { ChildProcess, normalizeSpawnArguments, stdioStringToArray, spawnSync, + setupChannel, }; diff --git a/ext/node/polyfills/process.ts b/ext/node/polyfills/process.ts index 575d8dfb14eefd..352d46f426f3c0 100644 --- a/ext/node/polyfills/process.ts +++ b/ext/node/polyfills/process.ts @@ -69,7 +69,6 @@ import { buildAllowedFlags } from "ext:deno_node/internal/process/per_thread.mjs const notImplementedEvents = [ "disconnect", - "message", "multipleResolves", "rejectionHandled", "worker", diff --git a/runtime/js/40_process.js b/runtime/js/40_process.js index b8e05ce5a7fb99..e628aeb4a61c77 100644 --- a/runtime/js/40_process.js +++ b/runtime/js/40_process.js @@ -159,6 +159,7 @@ function spawnChildInner(opFn, command, apiName, { stderr = "piped", signal = undefined, windowsRawArguments = false, + ipc = -1, } = {}) { const child = opFn({ cmd: pathFromURL(command), @@ -172,6 +173,7 @@ function spawnChildInner(opFn, command, apiName, { stdout, stderr, windowsRawArguments, + ipc, }, apiName); return new ChildProcess(illegalConstructorKey, { ...child, @@ -203,6 +205,12 @@ class ChildProcess { #waitPromise; #waitComplete = false; + #pipeFd; + // internal, used by ext/node + get _pipeFd() { + return this.#pipeFd; + } + #pid; get pid() { return this.#pid; @@ -239,6 +247,7 @@ class ChildProcess { stdinRid, stdoutRid, stderrRid, + pipeFd, // internal } = null) { if (key !== illegalConstructorKey) { throw new TypeError("Illegal constructor."); @@ -246,6 +255,7 @@ class ChildProcess { this.#rid = rid; this.#pid = pid; + this.#pipeFd = pipeFd; if (stdinRid !== null) { this.#stdin = writableStreamForRid(stdinRid); diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 0469b38bfc9058..5b4b164a25ab8f 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -440,6 +440,7 @@ function bootstrapMainRuntime(runtimeOptions) { 3: inspectFlag, 5: hasNodeModulesDir, 6: maybeBinaryNpmCommandName, + 7: nodeIpcFd, } = runtimeOptions; performance.setTimeOrigin(DateNow()); @@ -545,7 +546,7 @@ function bootstrapMainRuntime(runtimeOptions) { ObjectDefineProperty(globalThis, "Deno", util.readOnly(finalDenoNs)); if (nodeBootstrap) { - nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName); + nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName, nodeIpcFd); } } diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 1fdd4bf4d507a3..6f89e55294a42d 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -141,6 +141,8 @@ pub struct SpawnArgs { uid: Option, #[cfg(windows)] windows_raw_arguments: bool, + #[cfg(unix)] + ipc: Option, #[serde(flatten)] stdio: ChildStdio, @@ -205,11 +207,18 @@ pub struct SpawnOutput { stderr: Option, } +type CreateCommand = ( + std::process::Command, + // TODO(@littledivy): Ideally this would return Option but we are dealing with file descriptors + // all the way until setupChannel which makes it easier to share code between parent and child fork. + Option, +); + fn create_command( state: &mut OpState, args: SpawnArgs, api_name: &str, -) -> Result { +) -> Result { state .borrow_mut::() .check_run(&args.cmd, api_name)?; @@ -245,15 +254,6 @@ fn create_command( if let Some(uid) = args.uid { command.uid(uid); } - #[cfg(unix)] - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - unsafe { - command.pre_exec(|| { - libc::setgroups(0, std::ptr::null()); - Ok(()) - }); - } command.stdin(args.stdio.stdin.as_stdio()); command.stdout(match args.stdio.stdout { @@ -265,7 +265,91 @@ fn create_command( value => value.as_stdio(), }); - Ok(command) + #[cfg(unix)] + // TODO(bartlomieju): + #[allow(clippy::undocumented_unsafe_blocks)] + unsafe { + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + // SockFlag is broken on macOS + // https://github.com/nix-rust/nix/issues/861 + let mut fds = [-1, -1]; + #[cfg(not(target_os = "macos"))] + let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + + #[cfg(target_os = "macos")] + let flags = 0; + + let ret = libc::socketpair( + libc::AF_UNIX, + libc::SOCK_STREAM | flags, + 0, + fds.as_mut_ptr(), + ); + if ret != 0 { + return Err(std::io::Error::last_os_error().into()); + } + + if cfg!(target_os = "macos") { + let fcntl = + |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { + let flags = libc::fcntl(fd, libc::F_GETFL, 0); + + if flags == -1 { + return Err(fail(fds)); + } + let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag); + if ret == -1 { + return Err(fail(fds)); + } + Ok(()) + }; + + fn fail(fds: [i32; 2]) -> std::io::Error { + unsafe { + libc::close(fds[0]); + libc::close(fds[1]); + } + std::io::Error::last_os_error() + } + + // SOCK_NONBLOCK is not supported on macOS. + (fcntl)(fds[0], libc::O_NONBLOCK)?; + (fcntl)(fds[1], libc::O_NONBLOCK)?; + + // SOCK_CLOEXEC is not supported on macOS. + (fcntl)(fds[0], libc::FD_CLOEXEC)?; + (fcntl)(fds[1], libc::FD_CLOEXEC)?; + } + + let fd1 = fds[0]; + let fd2 = fds[1]; + + command.pre_exec(move || { + if ipc >= 0 { + let _fd = libc::dup2(fd2, ipc); + libc::close(fd2); + } + libc::setgroups(0, std::ptr::null()); + Ok(()) + }); + + /* One end returned to parent process (this) */ + let pipe_fd = Some(fd1); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", ipc)); + + return Ok((command, pipe_fd)); + } + + Ok((command, None)) + } + + #[cfg(not(unix))] + return Ok((command, None)); } #[derive(Serialize)] @@ -276,11 +360,13 @@ struct Child { stdin_rid: Option, stdout_rid: Option, stderr_rid: Option, + pipe_fd: Option, } fn spawn_child( state: &mut OpState, command: std::process::Command, + pipe_fd: Option, ) -> Result { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -362,6 +448,7 @@ fn spawn_child( stdin_rid, stdout_rid, stderr_rid, + pipe_fd, }) } @@ -372,8 +459,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result { - let command = create_command(state, args, &api_name)?; - spawn_child(state, command) + let (command, pipe_fd) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_fd) } #[op2(async)] @@ -402,7 +489,8 @@ fn op_spawn_sync( ) -> Result { let stdout = matches!(args.stdio.stdout, Stdio::Piped); let stderr = matches!(args.stdio.stderr, Stdio::Piped); - let mut command = create_command(state, args, "Deno.Command().outputSync()")?; + let (mut command, _) = + create_command(state, args, "Deno.Command().outputSync()")?; let output = command.output().with_context(|| { format!( "Failed to spawn '{}'", diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index 828bb3766b1283..8674190f3ea470 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -59,6 +59,7 @@ pub struct BootstrapOptions { pub inspect: bool, pub has_node_modules_dir: bool, pub maybe_binary_npm_command_name: Option, + pub node_ipc_fd: Option, } impl Default for BootstrapOptions { @@ -86,6 +87,7 @@ impl Default for BootstrapOptions { args: Default::default(), has_node_modules_dir: Default::default(), maybe_binary_npm_command_name: None, + node_ipc_fd: None, } } } @@ -115,6 +117,8 @@ struct BootstrapV8<'a>( bool, // maybe_binary_npm_command_name Option<&'a str>, + // node_ipc_fd + i32, ); impl BootstrapOptions { @@ -134,6 +138,7 @@ impl BootstrapOptions { self.enable_testing_features, self.has_node_modules_dir, self.maybe_binary_npm_command_name.as_deref(), + self.node_ipc_fd.unwrap_or(-1), ); bootstrap.serialize(ser).unwrap()