diff --git a/packages/bun-usockets/src/socket.c b/packages/bun-usockets/src/socket.c index 3199020728cd16..fdea7d2c6f1ff8 100644 --- a/packages/bun-usockets/src/socket.c +++ b/packages/bun-usockets/src/socket.c @@ -249,10 +249,6 @@ struct us_socket_t *us_socket_from_fd(struct us_socket_context_t *ctx, int socke us_internal_socket_context_link_socket(ctx, s); - if (ctx->on_open) { - ctx->on_open(s, 0, 0, 0); - } - return s; #endif } diff --git a/src/bun.js/api/bun.classes.ts b/src/bun.js/api/bun.classes.ts index 8dcecee2362f8c..3f529794ff5329 100644 --- a/src/bun.js/api/bun.classes.ts +++ b/src/bun.js/api/bun.classes.ts @@ -107,10 +107,12 @@ export default [ signalCode: { getter: "getSignalCode", }, - exited: { getter: "getExited", }, + stdio: { + getter: "getStdio", + }, }, }), ]; diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index a6445dab2dca71..9671913e013dbb 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -369,6 +369,12 @@ pub const SocketConfig = struct { } hostname_or_unix: { + if (opts.getTruthy(globalObject, "fd")) |fd_| { + if (fd_.isNumber()) { + break :hostname_or_unix; + } + } + if (opts.getTruthy(globalObject, "unix")) |unix_socket| { if (!unix_socket.isString()) { exception.* = JSC.toInvalidArguments("Expected \"unix\" to be a string", .{}, globalObject).asObjectRef(); @@ -497,6 +503,7 @@ pub const Listener = struct { host: []const u8, port: u16, }, + fd: bun.FileDescriptor, pub fn clone(this: UnixOrHost) UnixOrHost { switch (this) { @@ -506,8 +513,14 @@ pub const Listener = struct { }; }, .host => |h| { - return .{ .host = .{ .host = (bun.default_allocator.dupe(u8, h.host) catch unreachable), .port = this.host.port } }; + return .{ + .host = .{ + .host = (bun.default_allocator.dupe(u8, h.host) catch unreachable), + .port = this.host.port, + }, + }; }, + .fd => |f| return .{ .fd = f }, } } @@ -519,6 +532,7 @@ pub const Listener = struct { .host => |h| { bun.default_allocator.free(h.host); }, + .fd => {}, // this is an integer } } }; @@ -557,10 +571,7 @@ pub const Listener = struct { return JSValue.jsUndefined(); } - pub fn listen( - globalObject: *JSC.JSGlobalObject, - opts: JSValue, - ) JSValue { + pub fn listen(globalObject: *JSC.JSGlobalObject, opts: JSValue) JSValue { log("listen", .{}); var exception_ = [1]JSC.JSValueRef{null}; const exception: JSC.C.ExceptionRef = &exception_; @@ -581,7 +592,7 @@ pub const Listener = struct { var hostname_or_unix = socket_config.hostname_or_unix; const port = socket_config.port; var ssl = socket_config.ssl; - var handlers = &socket_config.handlers; + var handlers = socket_config.handlers; var protos: ?[]const u8 = null; const exclusive = socket_config.exclusive; handlers.is_server = true; @@ -690,6 +701,10 @@ pub const Listener = struct { defer bun.default_allocator.free(host); break :brk uws.us_socket_context_listen_unix(@intFromBool(ssl_enabled), socket_context, host, socket_flags, 8); }, + .fd => { + // don't call listen() on an fd + return .zero; + }, } } orelse { defer { @@ -716,7 +731,7 @@ pub const Listener = struct { }; var socket = Listener{ - .handlers = handlers.*, + .handlers = handlers, .connection = connection, .ssl = ssl_enabled, .socket_context = socket_context, @@ -869,7 +884,6 @@ pub const Listener = struct { if (this.connection != .host) { return JSValue.jsUndefined(); } - return ZigString.init(this.connection.host.host).withEncoding().toValueGC(globalObject); } @@ -877,7 +891,6 @@ pub const Listener = struct { if (this.connection != .host) { return JSValue.jsUndefined(); } - return JSValue.jsNumber(this.connection.host.port); } @@ -943,10 +956,17 @@ pub const Listener = struct { return .zero; }; - const connection: Listener.UnixOrHost = if (port) |port_| .{ - .host = .{ .host = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), .port = port_ }, - } else .{ - .unix = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), + const connection: Listener.UnixOrHost = blk: { + if (opts.getTruthy(globalObject, "fd")) |fd_| { + if (fd_.isNumber()) { + const fd: bun.FileDescriptor = fd_.asInt32(); + break :blk .{ .fd = fd }; + } + } + if (port) |_| { + break :blk .{ .host = .{ .host = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), .port = port.? } }; + } + break :blk .{ .unix = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice() }; }; if (ssl_enabled) { @@ -1147,24 +1167,28 @@ fn NewSocket(comptime ssl: bool) type { pub fn doConnect(this: *This, connection: Listener.UnixOrHost, socket_ctx: *uws.SocketContext) !void { switch (connection) { .host => |c| { - _ = @This().Socket.connectPtr( + _ = This.Socket.connectPtr( normalizeHost(c.host), c.port, socket_ctx, - @This(), + This, this, "socket", ) orelse return error.ConnectionFailed; }, .unix => |u| { - _ = @This().Socket.connectUnixPtr( + _ = This.Socket.connectUnixPtr( u, socket_ctx, - @This(), + This, this, "socket", ) orelse return error.ConnectionFailed; }, + .fd => |f| { + const socket = This.Socket.fromFd(socket_ctx, f, This, this, "socket") orelse return error.ConnectionFailed; + this.onOpen(socket); + }, } } diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 8f34db0162cf11..8dc1ea29c7f96b 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -154,6 +154,7 @@ pub const Subprocess = struct { stdout: Readable, stderr: Readable, poll: Poll = Poll{ .poll_ref = null }, + stdio_pipes: std.ArrayListUnmanaged(Stdio.PipeExtra) = .{}, exit_promise: JSC.Strong = .{}, on_exit_callback: JSC.Strong = .{}, @@ -700,6 +701,23 @@ pub const Subprocess = struct { return JSValue.jsBoolean(this.hasKilled()); } + pub fn getStdio( + this: *Subprocess, + global: *JSGlobalObject, + ) callconv(.C) JSValue { + const array = JSValue.createEmptyArray(global, 0); + array.push(global, .null); // TODO: align this with options + array.push(global, .null); // TODO: align this with options + array.push(global, .null); // TODO: align this with options + + for (this.stdio_pipes.items) |item| { + const uno: u32 = @intCast(item.fileno); + for (0..array.getLength(global) - uno) |_| array.push(global, .null); + array.push(global, JSValue.jsNumber(item.fd)); + } + return array; + } + pub const BufferedInput = struct { remain: []const u8 = "", fd: bun.FileDescriptor = bun.invalid_fd, @@ -1124,7 +1142,7 @@ pub const Subprocess = struct { pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable { switch (stdio) { - .pipe => { + .pipe => |maybe_readable| { if (Environment.isWindows) @panic("TODO"); var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); sink.* = .{ @@ -1135,15 +1153,13 @@ pub const Subprocess = struct { }; sink.mode = bun.S.IFIFO; sink.watch(fd); - if (stdio == .pipe) { - if (stdio.pipe) |readable| { - return Writable{ - .pipe_to_readable_stream = .{ - .pipe = sink, - .readable_stream = readable, - }, - }; - } + if (maybe_readable) |readable| { + return Writable{ + .pipe_to_readable_stream = .{ + .pipe = sink, + .readable_stream = readable, + }, + }; } return Writable{ .pipe = sink }; @@ -1161,11 +1177,9 @@ pub const Subprocess = struct { } return Writable{ .buffered_input = buffered_input }; }, - .memfd => { return Writable{ .memfd = stdio.memfd }; }, - .fd => { return Writable{ .fd = bun.toFD(fd) }; }, @@ -1255,6 +1269,7 @@ pub const Subprocess = struct { this.exit_promise.deinit(); this.on_exit_callback.deinit(); + this.stdio_pipes.deinit(bun.default_allocator); } pub fn finalize(this: *Subprocess) callconv(.C) void { @@ -1358,6 +1373,7 @@ pub const Subprocess = struct { var args = args_; var ipc_mode = IPCMode.none; var ipc_callback: JSValue = .zero; + var stdio_pipes: std.ArrayListUnmanaged(Stdio.PipeExtra) = .{}; var windows_hide: if (Environment.isWindows) u1 else u0 = 0; @@ -1508,11 +1524,31 @@ pub const Subprocess = struct { if (!stdio_val.isEmptyOrUndefinedOrNull()) { if (stdio_val.jsType().isArray()) { var stdio_iter = stdio_val.arrayIterator(globalThis); - stdio_iter.len = @min(stdio_iter.len, 4); var i: u32 = 0; while (stdio_iter.next()) |value| : (i += 1) { - if (!extractStdio(globalThis, i, value, &stdio)) + if (!extractStdio(globalThis, i, value, &stdio[i])) + return JSC.JSValue.jsUndefined(); + if (i == 2) + break; + } + i += 1; + + while (stdio_iter.next()) |value| : (i += 1) { + var new_item: Stdio = undefined; + if (!extractStdio(globalThis, i, value, &new_item)) return JSC.JSValue.jsUndefined(); + switch (new_item) { + .pipe => { + stdio_pipes.append(bun.default_allocator, .{ + .fd = 0, + .fileno = @intCast(i), + }) catch { + globalThis.throwOutOfMemory(); + return .zero; + }; + }, + else => {}, + } } } else { globalThis.throwInvalidArguments("stdio must be an array", .{}); @@ -1521,17 +1557,17 @@ pub const Subprocess = struct { } } else { if (args.get(globalThis, "stdin")) |value| { - if (!extractStdio(globalThis, bun.posix.STDIN_FD, value, &stdio)) + if (!extractStdio(globalThis, bun.posix.STDIN_FD, value, &stdio[bun.posix.STDIN_FD])) return .zero; } if (args.get(globalThis, "stderr")) |value| { - if (!extractStdio(globalThis, bun.posix.STDERR_FD, value, &stdio)) + if (!extractStdio(globalThis, bun.posix.STDERR_FD, value, &stdio[bun.posix.STDERR_FD])) return .zero; } if (args.get(globalThis, "stdout")) |value| { - if (!extractStdio(globalThis, bun.posix.STDOUT_FD, value, &stdio)) + if (!extractStdio(globalThis, bun.posix.STDOUT_FD, value, &stdio[bun.posix.STDOUT_FD])) return .zero; } } @@ -1797,6 +1833,33 @@ pub const Subprocess = struct { bun.STDERR_FD, ) catch |err| return globalThis.handleError(err, "in configuring child stderr"); + for (stdio_pipes.items) |*item| { + const maybe = blk: { + var fds: [2]c_int = undefined; + const socket_type = os.SOCK.STREAM; + const rc = std.os.system.socketpair(os.AF.UNIX, socket_type, 0, &fds); + switch (std.os.system.getErrno(rc)) { + .SUCCESS => {}, + .AFNOSUPPORT => break :blk error.AddressFamilyNotSupported, + .FAULT => break :blk error.Fault, + .MFILE => break :blk error.ProcessFdQuotaExceeded, + .NFILE => break :blk error.SystemFdQuotaExceeded, + .OPNOTSUPP => break :blk error.OperationNotSupported, + .PROTONOSUPPORT => break :blk error.ProtocolNotSupported, + else => |err| break :blk std.os.unexpectedErrno(err), + } + actions.dup2(fds[1], item.fileno) catch |err| break :blk err; + actions.close(fds[1]) catch |err| break :blk err; + item.fd = fds[0]; + // enable non-block + const before = std.c.fcntl(fds[0], os.F.GETFL); + _ = std.c.fcntl(fds[0], os.F.SETFL, before | os.O.NONBLOCK); + // enable SOCK_CLOXEC + _ = std.c.fcntl(fds[0], os.FD_CLOEXEC); + }; + _ = maybe catch |err| return globalThis.handleError(err, "in configuring child stderr"); + } + actions.chdir(cwd) catch |err| return globalThis.handleError(err, "in chdir()"); argv.append(allocator, null) catch { @@ -1851,14 +1914,15 @@ pub const Subprocess = struct { if (stdio[0].isPiped()) { _ = bun.sys.close(stdin_pipe[0]); } - if (stdio[1].isPiped()) { _ = bun.sys.close(stdout_pipe[1]); } - if (stdio[2].isPiped()) { _ = bun.sys.close(stderr_pipe[1]); } + for (stdio_pipes.items) |item| { + _ = bun.sys.close(item.fd + 1); + } } break :brk switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @as([*:null]?[*:0]const u8, @ptrCast(argv.items[0..].ptr)), env)) { @@ -1941,6 +2005,7 @@ pub const Subprocess = struct { // stdout and stderr only uses allocator and default_max_buffer_size if they are pipes and not a array buffer .stdout = Readable.init(stdio[bun.STDOUT_FD], stdout_pipe[0], jsc_vm.allocator, default_max_buffer_size), .stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size), + .stdio_pipes = stdio_pipes, .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, // will be assigned in the block below @@ -2080,12 +2145,13 @@ pub const Subprocess = struct { const resource_usage = subprocess.createResourceUsageObject(globalThis); subprocess.finalizeSync(); - const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4); + const sync_value = JSC.JSValue.createEmptyObject(globalThis, 5); sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@as(i32, @intCast(exitCode)))); sync_value.put(globalThis, JSC.ZigString.static("stdout"), stdout); sync_value.put(globalThis, JSC.ZigString.static("stderr"), stderr); sync_value.put(globalThis, JSC.ZigString.static("success"), JSValue.jsBoolean(exitCode == 0)); sync_value.put(globalThis, JSC.ZigString.static("resourceUsage"), resource_usage); + return sync_value; } @@ -2330,6 +2396,11 @@ pub const Subprocess = struct { array_buffer: JSC.ArrayBuffer.Strong, memfd: bun.FileDescriptor, + const PipeExtra = struct { + fd: i32, + fileno: i32, + }; + pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool { if (comptime !Environment.isLinux) { return false; @@ -2504,7 +2575,7 @@ pub const Subprocess = struct { globalThis: *JSC.JSGlobalObject, blob: JSC.WebCore.AnyBlob, i: u32, - stdio_array: []Stdio, + out_stdio: *Stdio, ) bool { const fd = bun.stdio(i); @@ -2512,7 +2583,7 @@ pub const Subprocess = struct { if (blob.store()) |store| { if (store.data.file.pathlike == .fd) { if (store.data.file.pathlike.fd == fd) { - stdio_array[i] = Stdio{ .inherit = {} }; + out_stdio.* = Stdio{ .inherit = {} }; } else { switch (bun.FDTag.get(i)) { .stdin => { @@ -2521,7 +2592,6 @@ pub const Subprocess = struct { return false; } }, - .stdout, .stderr => { if (i == 0) { globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); @@ -2531,18 +2601,18 @@ pub const Subprocess = struct { else => {}, } - stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + out_stdio.* = Stdio{ .fd = store.data.file.pathlike.fd }; } return true; } - stdio_array[i] = .{ .path = store.data.file.pathlike.path }; + out_stdio.* = .{ .path = store.data.file.pathlike.path }; return true; } } - stdio_array[i] = .{ .blob = blob }; + out_stdio.* = .{ .blob = blob }; return true; } @@ -2550,7 +2620,7 @@ pub const Subprocess = struct { globalThis: *JSC.JSGlobalObject, i: u32, value: JSValue, - stdio_array: []Stdio, + out_stdio: *Stdio, ) bool { if (value.isEmptyOrUndefinedOrNull()) { return true; @@ -2559,11 +2629,13 @@ pub const Subprocess = struct { if (value.isString()) { const str = value.getZigString(globalThis); if (str.eqlComptime("inherit")) { - stdio_array[i] = Stdio{ .inherit = {} }; + out_stdio.* = Stdio{ .inherit = {} }; } else if (str.eqlComptime("ignore")) { - stdio_array[i] = Stdio{ .ignore = {} }; + out_stdio.* = Stdio{ .ignore = {} }; } else if (str.eqlComptime("pipe")) { - stdio_array[i] = Stdio{ .pipe = null }; + out_stdio.* = Stdio{ .pipe = null }; + } else if (str.eqlComptime("ipc")) { + out_stdio.* = Stdio{ .pipe = null }; // TODO: } else { globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{}); return false; @@ -2596,22 +2668,22 @@ pub const Subprocess = struct { else => {}, } - stdio_array[i] = Stdio{ .fd = fd }; + out_stdio.* = Stdio{ .fd = fd }; return true; } else if (value.as(JSC.WebCore.Blob)) |blob| { - return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, stdio_array); + return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio); } else if (value.as(JSC.WebCore.Request)) |req| { req.getBodyValue().toBlobIfPossible(); - return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); } else if (value.as(JSC.WebCore.Response)) |req| { req.getBodyValue().toBlobIfPossible(); - return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| { var req = req_const; if (i == bun.STDIN_FD) { if (req.toAnyBlob(globalThis)) |blob| { - return extractStdioBlob(globalThis, blob, i, stdio_array); + return extractStdioBlob(globalThis, blob, i, out_stdio); } switch (req.ptr) { @@ -2622,7 +2694,7 @@ pub const Subprocess = struct { return false; } - stdio_array[i] = .{ .pipe = req }; + out_stdio.* = .{ .pipe = req }; return true; }, else => {}, @@ -2637,7 +2709,7 @@ pub const Subprocess = struct { return false; } - stdio_array[i] = .{ + out_stdio.* = .{ .array_buffer = JSC.ArrayBuffer.Strong{ .array_buffer = array_buffer, .held = JSC.Strong.create(array_buffer.value, globalThis), diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index fc97594dafca81..815947289fec44 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -3056,18 +3056,15 @@ pub const VirtualMachine = struct { const context = uws.us_create_socket_context(0, this.event_loop_handle.?, @sizeOf(usize), .{}).?; IPC.Socket.configure(context, true, *IPCInstance, IPCInstance.Handlers); - const socket = uws.newSocketFromFd(context, @sizeOf(*IPCInstance), fd) orelse { - uws.us_socket_context_free(0, context); - Output.warn("Failed to initialize IPC connection to parent", .{}); - return; - }; - var instance = bun.default_allocator.create(IPCInstance) catch @panic("OOM"); instance.* = .{ .globalThis = this.global, .uws_context = context, - .ipc = .{ .socket = socket }, + .ipc = undefined, }; + const socket = IPC.Socket.fromFd(context, fd, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); + instance.ipc = .{ .socket = socket }; + const ptr = socket.ext(*IPCInstance); ptr.?.* = instance; this.ipc = instance; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 23dcb7d8598d27..a3a700f6fcf9a2 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2296,7 +2296,14 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { const wait = callframe.argumentsCount() > 0 and callframe.argument(0).isBoolean() and callframe.argument(0).asBoolean(); - return this.sink.flushFromJS(globalThis, wait).result; + const maybe_value: JSC.Node.Maybe(JSValue) = this.sink.flushFromJS(globalThis, wait); + return switch (maybe_value) { + .result => |value| value, + .err => |err| blk: { + globalThis.vm().throwError(globalThis, err.toJSC(globalThis)); + break :blk JSC.JSValue.jsUndefined(); + }, + }; } return this.sink.flush().toJS(globalThis); @@ -4277,11 +4284,6 @@ pub const File = struct { return .{ .err = Syscall.Error.fromCode(.ISDIR, .fstat) }; } - if (bun.S.ISSOCK(stat.mode)) { - _ = Syscall.close(fd); - return .{ .err = Syscall.Error.fromCode(.INVAL, .fstat) }; - } - file.mode = @as(bun.Mode, @intCast(stat.mode)); this.mode = file.mode; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 1e8ef310e8c232..31ee56c842a2b0 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -443,6 +443,28 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { return ctx; } + pub fn fromFd( + ctx: *SocketContext, + handle: bun.FileDescriptor, + comptime This: type, + this: *This, + comptime socket_field_name: ?[]const u8, + ) ?ThisSocket { + const socket_ = ThisSocket{ .socket = us_socket_from_fd(ctx, @sizeOf(*anyopaque), handle) orelse return null }; + + const holder = socket_.ext(*anyopaque) orelse { + if (comptime bun.Environment.allow_assert) unreachable; + return null; + }; + holder.* = this; + + if (comptime socket_field_name) |field| { + @field(this, field) = socket_; + } + + return socket_; + } + pub fn connectUnixPtr( path: []const u8, socket_ctx: *SocketContext, @@ -2564,7 +2586,7 @@ extern fn us_socket_pair( extern fn us_socket_from_fd( ctx: *SocketContext, ext_size: c_int, - fds: LIBUS_SOCKET_DESCRIPTOR, + fd: LIBUS_SOCKET_DESCRIPTOR, ) ?*Socket; pub fn newSocketFromPair(ctx: *SocketContext, ext_size: c_int, fds: *[2]LIBUS_SOCKET_DESCRIPTOR) ?SocketTCP { @@ -2573,10 +2595,4 @@ pub fn newSocketFromPair(ctx: *SocketContext, ext_size: c_int, fds: *[2]LIBUS_SO }; } -pub fn newSocketFromFd(ctx: *SocketContext, ext_size: c_int, fd: LIBUS_SOCKET_DESCRIPTOR) ?SocketTCP { - return SocketTCP{ - .socket = us_socket_from_fd(ctx, ext_size, fd) orelse return null, - }; -} - extern fn us_socket_get_error(ssl_flag: c_int, socket: *Socket) c_int; diff --git a/src/js/node/child_process.js b/src/js/node/child_process.js index 1c1b5a87363253..4876010eab2988 100644 --- a/src/js/node/child_process.js +++ b/src/js/node/child_process.js @@ -3,6 +3,8 @@ const EventEmitter = require("node:events"); const StreamModule = require("node:stream"); const OsModule = require("node:os"); +var NetModule; + var ObjectCreate = Object.create; var ObjectAssign = Object.assign; var ObjectDefineProperty = Object.defineProperty; @@ -187,6 +189,9 @@ function spawn(file, args, options) { abortChildProcess(child, killSignal, options.signal.reason); } } + process.nextTick(() => { + child.emit("spawn"); + }); return child; } @@ -1048,6 +1053,7 @@ class ChildProcess extends EventEmitter { NativeWritable ||= StreamModule.NativeWritable; ReadableFromWeb ||= StreamModule.Readable.fromWeb; + if (!NetModule) NetModule = require("node:net"); const io = this.#stdioOptions[i]; switch (i) { @@ -1076,6 +1082,14 @@ class ChildProcess extends EventEmitter { return null; } } + default: + switch (io) { + case "pipe": + const fd = this.#handle.stdio[i]; + if (!fd) return null; + return new NetModule.connect({ fd }); + } + return null; } } @@ -1087,17 +1101,29 @@ class ChildProcess extends EventEmitter { #stdioOptions; #createStdioObject() { - return Object.create(null, { - 0: { - get: () => this.stdin, - }, - 1: { - get: () => this.stdout, - }, - 2: { - get: () => this.stderr, - }, - }); + let result = {}; + for (let i = 0; i < this.#stdioOptions.length; i++) { + const element = this.#stdioOptions[i]; + if (element !== "pipe") { + result[i] = null; + continue; + } + switch (i) { + case 0: + result[i] = this.stdin; + continue; + case 1: + result[i] = this.stdout; + continue; + case 2: + result[i] = this.stderr; + continue; + default: + result[i] = this.#getBunSpawnIo(i, this.#encoding); + continue; + } + } + return ObjectCreate.$call(null, result); } get stdin() { @@ -1164,15 +1190,18 @@ class ChildProcess extends EventEmitter { const detachedOption = options.detached; this.#encoding = options.encoding || undefined; this.#stdioOptions = bunStdio; + const stdioCount = stdio.length; + const hasSocketsToEagerlyLoad = stdioCount >= 3; this.#handle = Bun.spawn({ cmd: spawnargs, - stdin: bunStdio[0], - stdout: bunStdio[1], - stderr: bunStdio[2], + stdio: bunStdio, cwd: options.cwd || undefined, env: env || process.env, detached: typeof detachedOption !== "undefined" ? !!detachedOption : false, onExit: (handle, exitCode, signalCode, err) => { + if (hasSocketsToEagerlyLoad) { + this.stdio; + } $debug("ChildProcess: onExit", exitCode, signalCode, err, this.pid); this.#handle = handle; this.pid = this.#handle.pid; @@ -1197,6 +1226,10 @@ class ChildProcess extends EventEmitter { this.send = this.#send; this.disconnect = this.#disconnect; } + + if (hasSocketsToEagerlyLoad) { + this.stdio; + } } #emitIpcMessage(message) { @@ -1291,6 +1324,7 @@ const nodeToBunLookup = { pipe: "pipe", overlapped: "pipe", // TODO: this may need to work differently for Windows inherit: "inherit", + ipc: "ipc", }; function nodeToBun(item) { @@ -1366,9 +1400,9 @@ function normalizeStdio(stdio) { if (stdio.length === 0) processedStdio = ["pipe", "pipe", "pipe"]; else if (stdio.length === 1) processedStdio = [stdio[0], "pipe", "pipe"]; else if (stdio.length === 2) processedStdio = [stdio[0], stdio[1], "pipe"]; - else if (stdio.length >= 3) processedStdio = [stdio[0], stdio[1], stdio[2]]; + else if (stdio.length >= 3) processedStdio = stdio; - return processedStdio.map(item => (!item ? "pipe" : item)); + return processedStdio; } else { throw new ERR_INVALID_OPT_VALUE("stdio", stdio); } diff --git a/src/js/node/net.js b/src/js/node/net.js index 1939ba573d1702..8acec7ff503ab8 100644 --- a/src/js/node/net.js +++ b/src/js/node/net.js @@ -415,6 +415,7 @@ const Socket = (function (InternalSocket) { } if (typeof port == "object") { var { + fd, port, host, path, @@ -440,13 +441,31 @@ const Socket = (function (InternalSocket) { if (socket) { connection = socket; } + if (fd) { + bunConnect({ + data: this, + fd, + socket: Socket.#Handlers, + tls, + }).catch(error => { + this.emit("error", error); + this.emit("close"); + }); + } } this.pauseOnConnect = pauseOnConnect; if (!pauseOnConnect) { - this.resume(); + process.nextTick(() => { + this.resume(); + }); + this.connecting = true; } - this.connecting = true; + + if (fd) { + return this; + } + this.remotePort = port; const bunTLS = this[bunTlsSymbol]; diff --git a/src/js/node/stream.js b/src/js/node/stream.js index 5297c033c59834..befbe2c88fd7bf 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -42,7 +42,7 @@ const validateObject = (value, name, options = null) => { const nullable = options?.nullable ?? false; if ( (!nullable && value === null) || - (!allowArray && ArrayIsArray(value)) || + (!allowArray && $isJSArray(value)) || (typeof value !== "object" && (!allowFunction || typeof value !== "function")) ) { throw new ERR_INVALID_ARG_TYPE(name, "Object", value); @@ -61,8 +61,6 @@ function validateString(value, name) { if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value); } -var ArrayIsArray = Array.isArray; - //------------------------------------------------------------------------------ // Node error polyfills //------------------------------------------------------------------------------ @@ -716,14 +714,14 @@ var require_validators = __commonJS({ const nullable = useDefaultOptions ? false : options.nullable; if ( (!nullable && value === null) || - (!allowArray && ArrayIsArray(value)) || + (!allowArray && $isJSArray(value)) || (typeof value !== "object" && (!allowFunction || typeof value !== "function")) ) { throw new ERR_INVALID_ARG_TYPE(name, "Object", value); } }); var validateArray = hideStackFrames((value, name, minLength = 0) => { - if (!ArrayIsArray(value)) { + if (!$isJSArray(value)) { throw new ERR_INVALID_ARG_TYPE(name, "Array", value); } if (value.length < minLength) { @@ -2064,7 +2062,7 @@ var require_legacy = __commonJS({ function prependListener(emitter, event, fn) { if (typeof emitter.prependListener === "function") return emitter.prependListener(event, fn); if (!emitter._events || !emitter._events[event]) emitter.on(event, fn); - else if (ArrayIsArray(emitter._events[event])) emitter._events[event].unshift(fn); + else if ($isJSArray(emitter._events[event])) emitter._events[event].unshift(fn); else emitter._events[event] = [fn, emitter._events[event]]; } module.exports = { @@ -2804,6 +2802,7 @@ var require_readable = __commonJS({ } } + $debug("length", state.length, state.ended, nOrig, n); if (state.length === 0) { // If we have nothing in the buffer, then we want to know // as soon as we *do* get something into the buffer. @@ -4047,7 +4046,6 @@ var require_duplexify = __commonJS({ } = require_errors(); var { destroyer } = require_destroy(); var Duplex = require_duplex(); - var Readable = require_readable(); var { createDeferredPromise } = require_util(); var from = require_from(); var isBlob = @@ -4383,8 +4381,6 @@ var require_duplex = __commonJS({ var { ObjectDefineProperties, ObjectGetOwnPropertyDescriptor, ObjectKeys, ObjectSetPrototypeOf } = require_primordials(); - var Readable = require_readable(); - function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); Readable.$call(this, options); @@ -4470,7 +4466,6 @@ var require_transform = __commonJS({ "use strict"; var { ObjectSetPrototypeOf, Symbol: Symbol2 } = require_primordials(); var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes; - var Duplex = require_duplex(); function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); Duplex.$call(this, options); @@ -4593,7 +4588,6 @@ var require_pipeline = __commonJS({ var eos = require_end_of_stream(); var { once } = require_util(); var destroyImpl = require_destroy(); - var Duplex = require_duplex(); var { aggregateTwoErrors, codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED }, @@ -4602,7 +4596,6 @@ var require_pipeline = __commonJS({ var { validateFunction, validateAbortSignal } = require_validators(); var { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require_utils(); var PassThrough; - var Readable; function destroyer(stream, reading, writing) { let finished = false; stream.on("close", () => { @@ -4640,9 +4633,6 @@ var require_pipeline = __commonJS({ throw new ERR_INVALID_ARG_TYPE("val", ["Readable", "Iterable", "AsyncIterable"], val); } async function* fromReadable(val) { - if (!Readable) { - Readable = require_readable(); - } yield* Readable.prototype[SymbolAsyncIterator].$call(val); } async function pump(iterable, writable, finish, { end }) { @@ -4705,7 +4695,7 @@ var require_pipeline = __commonJS({ return pipelineImpl(streams, once(popCallback(streams))); } function pipelineImpl(streams, callback, opts) { - if (streams.length === 1 && ArrayIsArray(streams[0])) { + if (streams.length === 1 && $isJSArray(streams[0])) { streams = streams[0]; } if (streams.length < 2) { @@ -5471,7 +5461,9 @@ function getNativeReadableStream(Readable, stream, options) { } /** --- Bun native stream wrapper --- */ +var Readable = require_readable(); var Writable = require_writable(); +var Duplex = require_duplex(); const _pathOrFdOrSink = Symbol("pathOrFdOrSink"); const _fileSink = Symbol("fileSink"); diff --git a/test/js/node/child_process/child-process-stdio.test.js b/test/js/node/child_process/child-process-stdio.test.js index d0c6d9bc756178..666f59fc282923 100644 --- a/test/js/node/child_process/child-process-stdio.test.js +++ b/test/js/node/child_process/child-process-stdio.test.js @@ -110,3 +110,42 @@ describe("process.stdin", () => { expect(result).toEqual("data: File read successfully"); }); }); + +describe("process.stdio pipes", () => { + it("is writable", () => { + const child = spawn(bunExe(), [import.meta.dir + "/fixtures/child-process-pipe-read.js"], { + env: bunEnv, + stdio: ["pipe", "pipe", "pipe", "pipe"], + }); + const pipe = child.stdio[3]; + expect(pipe).not.toBe(null); + pipe.write("stdout_test"); + + child.stdout.on("data", data => { + try { + expect(data).toBe("stdout_test"); + done(); + } catch (err) { + done(err); + } + }); + }); + + it("is readable", () => { + const child = spawn(bunExe(), [import.meta.dir + "/fixtures/child-process-pipe-read.js"], { + env: bunEnv, + stdio: ["pipe", "pipe", "pipe", "pipe"], + }); + const pipe = child.stdio[3]; + expect(pipe).not.toBe(null); + + child.stdout.on("data", data => { + try { + expect(data).toBe("stdout_test"); + done(); + } catch (err) { + done(err); + } + }); + }); +}); diff --git a/test/js/node/child_process/fixtures/child-process-pipe-read.js b/test/js/node/child_process/fixtures/child-process-pipe-read.js new file mode 100644 index 00000000000000..522c0080fcb4e0 --- /dev/null +++ b/test/js/node/child_process/fixtures/child-process-pipe-read.js @@ -0,0 +1,16 @@ +import { sleep } from "bun"; + +const fs = require("node:fs"); + +const stream = fs.createReadStream(null, { fd: 3 }); +var have_read = false; + +stream.on("ready", () => { + console.log(stream.read()); + have_read = true; +}); + +while (true) { + await sleep(250); + if (have_read) break; +} diff --git a/test/js/node/child_process/fixtures/child-process-pipe-write.js b/test/js/node/child_process/fixtures/child-process-pipe-write.js new file mode 100644 index 00000000000000..a10e0a4900c8d9 --- /dev/null +++ b/test/js/node/child_process/fixtures/child-process-pipe-write.js @@ -0,0 +1,7 @@ +const fs = require("node:fs"); + +const stream = fs.createWriteStream(null, { fd: 3 }); + +stream.on("ready", () => { + stream.write("stdout_test\n"); +}); diff --git a/test/js/web/web-globals.test.js b/test/js/web/web-globals.test.js index 3695b30d968b42..818cdcdb452b70 100644 --- a/test/js/web/web-globals.test.js +++ b/test/js/web/web-globals.test.js @@ -237,9 +237,7 @@ test("navigator", () => { test("confirm (yes)", async () => { const proc = spawn({ cmd: [bunExe(), require("path").join(import.meta.dir, "./confirm-fixture.js")], - stderr: "pipe", - stdin: "pipe", - stdout: "pipe", + stdio: ["pipe", "pipe", "pipe"], env: bunEnv, }); @@ -257,9 +255,7 @@ test("confirm (yes)", async () => { test("confirm (no)", async () => { const proc = spawn({ cmd: [bunExe(), require("path").join(import.meta.dir, "./confirm-fixture.js")], - stderr: "pipe", - stdin: "pipe", - stdout: "pipe", + stdio: ["pipe", "pipe", "pipe"], env: bunEnv, });