Skip to content

Commit

Permalink
node:child_process: support defining extra pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
nektro committed Jan 3, 2024
1 parent 9f8ee7c commit e21bc30
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 69 deletions.
4 changes: 3 additions & 1 deletion src/bun.js/api/bun.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ export default [
signalCode: {
getter: "getSignalCode",
},

exited: {
getter: "getExited",
},
pipe_fds: {
getter: "getPipeFds",
},
},
}),
];
124 changes: 89 additions & 35 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub const Subprocess = struct {
stdout: Readable,
stderr: Readable,
poll: Poll = Poll{ .poll_ref = null },
stdio_pipes: std.BoundedArray(Stdio.PipeExtra, 8) = .{},

exit_promise: JSC.Strong = .{},
on_exit_callback: JSC.Strong = .{},
Expand Down Expand Up @@ -700,6 +701,17 @@ pub const Subprocess = struct {
return JSValue.jsBoolean(this.hasKilled());
}

pub fn getPipeFds(
this: *Subprocess,
global: *JSGlobalObject,
) callconv(.C) JSValue {
const array = JSValue.createEmptyArray(global, this.stdio_pipes.len);
for (this.stdio_pipes.slice(), 0..) |item, i| {
array.putIndex(global, @intCast(i), JSValue.jsNumber(item.fd));
}
return array;
}

pub const BufferedInput = struct {
remain: []const u8 = "",
fd: bun.FileDescriptor = bun.invalid_fd,
Expand Down Expand Up @@ -1124,7 +1136,7 @@ pub const Subprocess = struct {

pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
.pipe => {
.pipe => |maybe_readable| {
if (Environment.isWindows) @panic("TODO");
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
sink.* = .{
Expand All @@ -1135,15 +1147,13 @@ pub const Subprocess = struct {
};
sink.mode = bun.S.IFIFO;
sink.watch(fd);
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
return Writable{
.pipe_to_readable_stream = .{
.pipe = sink,
.readable_stream = readable,
},
};
}
if (maybe_readable) |readable| {
return Writable{
.pipe_to_readable_stream = .{
.pipe = sink,
.readable_stream = readable,
},
};
}

return Writable{ .pipe = sink };
Expand All @@ -1161,11 +1171,9 @@ pub const Subprocess = struct {
}
return Writable{ .buffered_input = buffered_input };
},

.memfd => {
return Writable{ .memfd = stdio.memfd };
},

.fd => {
return Writable{ .fd = bun.toFD(fd) };
},
Expand Down Expand Up @@ -1358,6 +1366,7 @@ pub const Subprocess = struct {
var args = args_;
var ipc_mode = IPCMode.none;
var ipc_callback: JSValue = .zero;
var stdio_pipes: std.BoundedArray(Stdio.PipeExtra, 8) = .{};

var windows_hide: if (Environment.isWindows) u1 else u0 = 0;

Expand Down Expand Up @@ -1508,11 +1517,28 @@ pub const Subprocess = struct {
if (!stdio_val.isEmptyOrUndefinedOrNull()) {
if (stdio_val.jsType().isArray()) {
var stdio_iter = stdio_val.arrayIterator(globalThis);
stdio_iter.len = @min(stdio_iter.len, 4);
var i: u32 = 0;
while (stdio_iter.next()) |value| : (i += 1) {
if (!extractStdio(globalThis, i, value, &stdio))
if (!extractStdio(globalThis, i, value, &stdio[i]))
return JSC.JSValue.jsUndefined();
if (i == 2)
break;
}
i += 1;

while (stdio_iter.next()) |value| : (i += 1) {
var new_item: Stdio = undefined;
if (!extractStdio(globalThis, i, value, &new_item))
return JSC.JSValue.jsUndefined();
switch (new_item) {
.pipe => {
stdio_pipes.append(.{
.fd = 0,
.fileno = @intCast(i),
}) catch @panic("requested more than 8 pipes");
},
else => {},
}
}
} else {
globalThis.throwInvalidArguments("stdio must be an array", .{});
Expand All @@ -1521,17 +1547,17 @@ pub const Subprocess = struct {
}
} else {
if (args.get(globalThis, "stdin")) |value| {
if (!extractStdio(globalThis, bun.posix.STDIN_FD, value, &stdio))
if (!extractStdio(globalThis, bun.posix.STDIN_FD, value, &stdio[bun.posix.STDIN_FD]))
return .zero;
}

if (args.get(globalThis, "stderr")) |value| {
if (!extractStdio(globalThis, bun.posix.STDERR_FD, value, &stdio))
if (!extractStdio(globalThis, bun.posix.STDERR_FD, value, &stdio[bun.posix.STDERR_FD]))
return .zero;
}

if (args.get(globalThis, "stdout")) |value| {
if (!extractStdio(globalThis, bun.posix.STDOUT_FD, value, &stdio))
if (!extractStdio(globalThis, bun.posix.STDOUT_FD, value, &stdio[bun.posix.STDOUT_FD]))
return .zero;
}
}
Expand Down Expand Up @@ -1797,6 +1823,28 @@ pub const Subprocess = struct {
bun.STDERR_FD,
) catch |err| return globalThis.handleError(err, "in configuring child stderr");

for (stdio_pipes.slice()) |*item| {
const maybe = blk: {
var fds: [2]c_int = undefined;
const rc = std.os.system.socketpair(os.AF.UNIX, os.SOCK.STREAM, 0, &fds);
switch (std.os.system.getErrno(rc)) {
.SUCCESS => {},
.AFNOSUPPORT => break :blk error.AddressFamilyNotSupported,
.FAULT => unreachable,
.MFILE => break :blk error.ProcessFdQuotaExceeded,
.NFILE => break :blk error.SystemFdQuotaExceeded,
.OPNOTSUPP => break :blk error.OperationNotSupported,
.PROTONOSUPPORT => break :blk error.ProtocolNotSupported,
else => |err| break :blk std.os.unexpectedErrno(err),
}
std.log.info("socketpair[{d}] = {{ {d}, {d} }}", .{ item.fileno, fds[0], fds[1] });
actions.dup2(fds[1], item.fileno) catch |err| break :blk err;
actions.close(fds[0]) catch |err| break :blk err;
item.fd = fds[0];
};
_ = maybe catch |err| return globalThis.handleError(err, "in configuring child stderr");
}

actions.chdir(cwd) catch |err| return globalThis.handleError(err, "in chdir()");

argv.append(allocator, null) catch {
Expand Down Expand Up @@ -1941,6 +1989,7 @@ pub const Subprocess = struct {
// stdout and stderr only uses allocator and default_max_buffer_size if they are pipes and not a array buffer
.stdout = Readable.init(stdio[bun.STDOUT_FD], stdout_pipe[0], jsc_vm.allocator, default_max_buffer_size),
.stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size),
.stdio_pipes = stdio_pipes,
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.ipc_mode = ipc_mode,
// will be assigned in the block below
Expand Down Expand Up @@ -2080,12 +2129,13 @@ pub const Subprocess = struct {
const resource_usage = subprocess.createResourceUsageObject(globalThis);
subprocess.finalizeSync();

const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4);
const sync_value = JSC.JSValue.createEmptyObject(globalThis, 5);

This comment has been minimized.

Copy link
@Jarred-Sumner

Jarred-Sumner Jan 3, 2024

Collaborator

wheres the fifth?

sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@as(i32, @intCast(exitCode))));
sync_value.put(globalThis, JSC.ZigString.static("stdout"), stdout);
sync_value.put(globalThis, JSC.ZigString.static("stderr"), stderr);
sync_value.put(globalThis, JSC.ZigString.static("success"), JSValue.jsBoolean(exitCode == 0));
sync_value.put(globalThis, JSC.ZigString.static("resourceUsage"), resource_usage);

return sync_value;
}

Expand Down Expand Up @@ -2330,6 +2380,11 @@ pub const Subprocess = struct {
array_buffer: JSC.ArrayBuffer.Strong,
memfd: bun.FileDescriptor,

const PipeExtra = struct {
fd: i32,
fileno: i32,
};

pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool {
if (comptime !Environment.isLinux) {
return false;
Expand Down Expand Up @@ -2504,15 +2559,15 @@ pub const Subprocess = struct {
globalThis: *JSC.JSGlobalObject,
blob: JSC.WebCore.AnyBlob,
i: u32,
stdio_array: []Stdio,
out_stdio: *Stdio,
) bool {
const fd = bun.stdio(i);

if (blob.needsToReadFile()) {
if (blob.store()) |store| {
if (store.data.file.pathlike == .fd) {
if (store.data.file.pathlike.fd == fd) {
stdio_array[i] = Stdio{ .inherit = {} };
out_stdio.* = Stdio{ .inherit = {} };
} else {
switch (bun.FDTag.get(i)) {
.stdin => {
Expand All @@ -2521,7 +2576,6 @@ pub const Subprocess = struct {
return false;
}
},

.stdout, .stderr => {
if (i == 0) {
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
Expand All @@ -2531,26 +2585,26 @@ pub const Subprocess = struct {
else => {},
}

stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd };
out_stdio.* = Stdio{ .fd = store.data.file.pathlike.fd };
}

return true;
}

stdio_array[i] = .{ .path = store.data.file.pathlike.path };
out_stdio.* = .{ .path = store.data.file.pathlike.path };
return true;
}
}

stdio_array[i] = .{ .blob = blob };
out_stdio.* = .{ .blob = blob };
return true;
}

fn extractStdio(
globalThis: *JSC.JSGlobalObject,
i: u32,
value: JSValue,
stdio_array: []Stdio,
out_stdio: *Stdio,
) bool {
if (value.isEmptyOrUndefinedOrNull()) {
return true;
Expand All @@ -2559,11 +2613,11 @@ pub const Subprocess = struct {
if (value.isString()) {
const str = value.getZigString(globalThis);
if (str.eqlComptime("inherit")) {
stdio_array[i] = Stdio{ .inherit = {} };
out_stdio.* = Stdio{ .inherit = {} };
} else if (str.eqlComptime("ignore")) {
stdio_array[i] = Stdio{ .ignore = {} };
out_stdio.* = Stdio{ .ignore = {} };
} else if (str.eqlComptime("pipe")) {
stdio_array[i] = Stdio{ .pipe = null };
out_stdio.* = Stdio{ .pipe = null };
} else {
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{});
return false;
Expand Down Expand Up @@ -2596,22 +2650,22 @@ pub const Subprocess = struct {
else => {},
}

stdio_array[i] = Stdio{ .fd = fd };
out_stdio.* = Stdio{ .fd = fd };

return true;
} else if (value.as(JSC.WebCore.Blob)) |blob| {
return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, stdio_array);
return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio);
} else if (value.as(JSC.WebCore.Request)) |req| {
req.getBodyValue().toBlobIfPossible();
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array);
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
} else if (value.as(JSC.WebCore.Response)) |req| {
req.getBodyValue().toBlobIfPossible();
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array);
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
} else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| {
var req = req_const;
if (i == bun.STDIN_FD) {
if (req.toAnyBlob(globalThis)) |blob| {
return extractStdioBlob(globalThis, blob, i, stdio_array);
return extractStdioBlob(globalThis, blob, i, out_stdio);
}

switch (req.ptr) {
Expand All @@ -2622,7 +2676,7 @@ pub const Subprocess = struct {
return false;
}

stdio_array[i] = .{ .pipe = req };
out_stdio.* = .{ .pipe = req };
return true;
},
else => {},
Expand All @@ -2637,7 +2691,7 @@ pub const Subprocess = struct {
return false;
}

stdio_array[i] = .{
out_stdio.* = .{
.array_buffer = JSC.ArrayBuffer.Strong{
.array_buffer = array_buffer,
.held = JSC.Strong.create(array_buffer.value, globalThis),
Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/ZigGlobalObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class InternalModuleRegistry;
#include "WebCoreJSBuiltins.h"
#include "headers-handwritten.h"
#include "BunCommonStrings.h"
#include <sys/socket.h>

This comment has been minimized.

Copy link
@Jarred-Sumner

Jarred-Sumner Jan 3, 2024

Collaborator

is this necessary?


namespace WebCore {
class GlobalScope;
Expand Down
14 changes: 8 additions & 6 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2296,7 +2296,14 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
const wait = callframe.argumentsCount() > 0 and
callframe.argument(0).isBoolean() and
callframe.argument(0).asBoolean();
return this.sink.flushFromJS(globalThis, wait).result;
const maybe_value: JSC.Node.Maybe(JSValue) = this.sink.flushFromJS(globalThis, wait);
return switch (maybe_value) {
.result => |value| value,
.err => |err| blk: {
globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
break :blk JSC.JSValue.jsUndefined();
},
};
}

return this.sink.flush().toJS(globalThis);
Expand Down Expand Up @@ -4277,11 +4284,6 @@ pub const File = struct {
return .{ .err = Syscall.Error.fromCode(.ISDIR, .fstat) };
}

if (bun.S.ISSOCK(stat.mode)) {
_ = Syscall.close(fd);
return .{ .err = Syscall.Error.fromCode(.INVAL, .fstat) };
}

file.mode = @as(bun.Mode, @intCast(stat.mode));
this.mode = file.mode;

Expand Down
Loading

0 comments on commit e21bc30

Please sign in to comment.