Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement child_process IPC #21490

Merged
merged 37 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
11d3db3
Initial commit
littledivy Dec 6, 2023
2c5c6f8
More stuff
littledivy Dec 6, 2023
cb08343
Works
littledivy Dec 7, 2023
6d2cbdd
x
littledivy Dec 7, 2023
99384ca
Add tests
littledivy Dec 7, 2023
a0db846
review + redundant use of Rc
littledivy Dec 8, 2023
099001e
refactor resource
littledivy Dec 8, 2023
c2e0835
improve reader
littledivy Dec 8, 2023
9d96671
Fix disconnect
littledivy Dec 8, 2023
9c91aab
lint
littledivy Dec 8, 2023
7920ce9
cfg unix
littledivy Dec 8, 2023
799a9fb
win lint
littledivy Dec 8, 2023
c1acfb5
fix
littledivy Dec 8, 2023
d071fd8
x
littledivy Dec 9, 2023
eb7029c
faster
littledivy Dec 10, 2023
ce3865a
x
littledivy Dec 10, 2023
906006f
lint
littledivy Dec 10, 2023
dc1f96c
Merge branch 'main' of github.com:denoland/deno into node_ipc
littledivy Dec 10, 2023
a34d652
stuff
littledivy Dec 10, 2023
a70d90a
Add tests
littledivy Dec 10, 2023
3715227
gate simd mod
littledivy Dec 10, 2023
0b1edf5
x
littledivy Dec 10, 2023
6827225
Delete cli/tests/node_compat/require.ts
littledivy Dec 10, 2023
d89007e
x
littledivy Dec 10, 2023
2b63519
fix windows lint
littledivy Dec 10, 2023
48b7b36
Fx
littledivy Dec 10, 2023
dd518fb
Sdt
littledivy Dec 10, 2023
b96a507
fmt config.jsonc
littledivy Dec 11, 2023
ae04e52
x
littledivy Dec 11, 2023
a7581d7
fmt
littledivy Dec 11, 2023
735f799
x
littledivy Dec 11, 2023
9a74269
fix hang
littledivy Dec 12, 2023
64c1d84
Fix
littledivy Dec 12, 2023
76f8b46
fix
littledivy Dec 13, 2023
8cca33a
x
littledivy Dec 13, 2023
6ef9270
x
littledivy Dec 13, 2023
4c1f790
x
littledivy Dec 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions cli/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,17 @@ impl CliOptions {
.map(Some)
}

pub fn node_ipc_fd(&self) -> Option<i32> {
let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable.
std::env::remove_var("DENO_CHANNEL_FD");
node_channel_fd.parse::<i32>().ok()
} else {
None
}
}

pub fn resolve_main_module(&self) -> Result<ModuleSpecifier, AnyError> {
match &self.flags.subcommand {
DenoSubcommand::Bundle(bundle_flags) => {
Expand Down
1 change: 1 addition & 0 deletions cli/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ impl CliFactory {
self.maybe_lockfile().clone(),
self.feature_checker().clone(),
self.create_cli_main_worker_options()?,
self.options.node_ipc_fd(),
))
}

Expand Down
1 change: 1 addition & 0 deletions cli/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ pub async fn run(
unstable: metadata.unstable,
maybe_root_package_json_deps: package_json_deps_provider.deps().cloned(),
},
None,
);

v8_set_flags(construct_v8_flags(&[], &metadata.v8_flags, vec![]));
Expand Down
5 changes: 5 additions & 0 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct SharedWorkerState {
maybe_inspector_server: Option<Arc<InspectorServer>>,
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
node_ipc: Option<i32>,
}

impl SharedWorkerState {
Expand Down Expand Up @@ -416,6 +417,7 @@ impl CliMainWorkerFactory {
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
options: CliMainWorkerOptions,
node_ipc: Option<i32>,
) -> Self {
Self {
shared: Arc::new(SharedWorkerState {
Expand All @@ -436,6 +438,7 @@ impl CliMainWorkerFactory {
maybe_inspector_server,
maybe_lockfile,
feature_checker,
node_ipc,
}),
}
}
Expand Down Expand Up @@ -599,6 +602,7 @@ impl CliMainWorkerFactory {
.options
.maybe_binary_npm_command_name
.clone(),
node_ipc_fd: shared.node_ipc,
},
extensions,
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down Expand Up @@ -793,6 +797,7 @@ fn create_web_worker_callback(
.options
.maybe_binary_npm_command_name
.clone(),
node_ipc_fd: None,
},
extensions,
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down
1 change: 1 addition & 0 deletions ext/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libc.workspace = true
libz-sys.workspace = true
md-5 = "0.10.5"
md4 = "0.10.2"
nix.workspace = true
num-bigint.workspace = true
num-bigint-dig = "0.8.2"
num-integer = "0.1.45"
Expand Down
3 changes: 3 additions & 0 deletions ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ deno_core::extension!(deno_node,
ops::require::op_require_break_on_next_statement,
ops::util::op_node_guess_handle_type,
ops::crypto::op_node_create_private_key,
ops::ipc::op_node_ipc_pipe,
ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read,
],
esm_entry_point = "ext:deno_node/02_init.js",
esm = [
Expand Down
179 changes: 179 additions & 0 deletions ext/node/ops/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use std::cell::RefCell;
use std::io;
use std::os::fd::FromRawFd;
use std::os::fd::RawFd;
use std::rc::Rc;

use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde_json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use tokio::net::UnixStream;

struct IpcPipe {
// Better name?
inner: UnixStream,
}

impl IpcPipe {
fn new(fd: RawFd) -> Result<Self, std::io::Error> {
Ok(Self {
inner: UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(fd)
})?,
})
}

async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, std::io::Error> {
littledivy marked this conversation as resolved.
Show resolved Hide resolved
let mut offset = 0;
loop {
self.inner.writable().await?;
match self.inner.try_write(&data[offset..]) {
Ok(n) => {
offset += n;
if offset >= data.len() {
return Ok(offset);
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => return Err(e),
}
}
}

async fn read(&self, data: &mut [u8]) -> Result<usize, std::io::Error> {
loop {
self.inner.readable().await?;
match self.inner.try_read(&mut data[..]) {
Ok(n) => {
return Ok(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => return Err(e),
}
}
}
}

struct IpcJsonStream {
pipe: Rc<IpcPipe>,
buffer: RefCell<Vec<u8>>,
cancel: CancelHandle,
}

impl IpcJsonStream {
fn new(fd: RawFd) -> Result<Self, std::io::Error> {
Ok(Self {
pipe: Rc::new(IpcPipe::new(fd)?),
buffer: RefCell::new(Vec::new()),
cancel: CancelHandle::default(),
})
}

async fn read(self: Rc<Self>) -> Result<Vec<serde_json::Value>, AnyError> {
let mut buf = [0u8; 1024]; // TODO: Use a single growable buffer.
let mut msgs = Vec::new();
loop {
let n = self.pipe.read(&mut buf).await?;

let read = &buf[..n];
let mut chunk_boundary = 0;

for byte in read {
if *byte == b'\n' {
let chunk = &read[..chunk_boundary];
self.buffer.borrow_mut().extend_from_slice(chunk);

chunk_boundary = 0;
if chunk.is_empty() {
// Last chunk.
break;
}
msgs.push(serde_json::from_slice(&self.buffer.borrow())?);
self.buffer.borrow_mut().clear();
} else {
chunk_boundary += 1;
}
}
littledivy marked this conversation as resolved.
Show resolved Hide resolved

if chunk_boundary > 0 {
let buffer = &mut self.buffer.borrow_mut();
buffer.clear();
buffer.extend_from_slice(&read[..chunk_boundary]);
}

if !msgs.is_empty() {
return Ok(msgs);
}
}
}

async fn write(
self: Rc<Self>,
msg: serde_json::Value,
) -> Result<(), AnyError> {
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &msg)?;
buf.push(b'\n');
self.pipe.clone().write(&buf).await?;
littledivy marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}

impl deno_core::Resource for IpcJsonStream {
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}

#[op2(fast)]
#[smi]
pub fn op_node_ipc_pipe(
state: &mut OpState,
#[smi] fd: i32,
) -> Result<ResourceId, AnyError> {
Ok(state.resource_table.add(IpcJsonStream::new(fd)?))
}

#[op2(async)]
pub async fn op_node_ipc_write(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[serde] value: serde_json::Value,
) -> Result<(), AnyError> {
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStream>(rid)
.map_err(|_| bad_resource_id())?;
stream.write(value).await?;
Ok(())
}

#[op2(async)]
#[serde]
pub async fn op_node_ipc_read(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<Vec<serde_json::Value>, AnyError> {
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStream>(rid)
.map_err(|_| bad_resource_id())?;

let cancel = RcRef::map(stream.clone(), |r| &r.cancel);
let msgs = stream.read().or_cancel(cancel).await??;
Ok(msgs)
}
1 change: 1 addition & 0 deletions ext/node/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod crypto;
pub mod http;
pub mod http2;
pub mod idna;
pub mod ipc;
pub mod os;
pub mod require;
pub mod util;
Expand Down
8 changes: 4 additions & 4 deletions ext/node/polyfills/02_init.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ const requireImpl = internals.requireImpl;
import { nodeGlobals } from "ext:deno_node/00_globals.js";
import "node:module";

globalThis.nodeBootstrap = function (usesLocalNodeModulesDir, argv0) {
initialize(usesLocalNodeModulesDir, argv0);
};

let initialized = false;

function initialize(
usesLocalNodeModulesDir,
argv0,
ipcFd,
) {
if (initialized) {
throw Error("Node runtime already initialized");
Expand All @@ -41,6 +38,7 @@ function initialize(
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads();
internals.__setupChildProcessIpcChannel(ipcFd);
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
}
Expand All @@ -52,6 +50,8 @@ function loadCjsModule(moduleName, isMain, inspectBrk) {
requireImpl.Module._load(moduleName, null, { main: isMain });
}

globalThis.nodeBootstrap = initialize;

internals.node = {
initialize,
loadCjsModule,
Expand Down
9 changes: 9 additions & 0 deletions ext/node/polyfills/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ChildProcess,
ChildProcessOptions,
normalizeSpawnArguments,
setupChannel,
type SpawnOptions,
spawnSync as _spawnSync,
type SpawnSyncOptions,
Expand Down Expand Up @@ -821,6 +822,14 @@ export function execFileSync(
return ret.stdout as string | Buffer;
}

function setupChildProcessIpcChannel(fd: number) {
if (typeof fd != "number" || fd < 0) return;
setupChannel(process, fd);
}

globalThis.__bootstrap.internals.__setupChildProcessIpcChannel =
setupChildProcessIpcChannel;

export default {
fork,
spawn,
Expand Down
Loading
Loading