Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_wrap: add provider types for net server #17157

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-c2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
Expand All @@ -36,7 +36,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
Expand Down Expand Up @@ -92,7 +92,7 @@ function client() {
throw new Error(`invalid type: ${type}`);
}

const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);

Expand Down
6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
Expand All @@ -35,7 +35,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
Expand Down Expand Up @@ -89,7 +89,7 @@ function client() {
throw new Error(`invalid type: ${type}`);
}

const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);
var bytes = 0;
Expand Down
6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
Expand All @@ -35,7 +35,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
Expand Down Expand Up @@ -107,7 +107,7 @@ function server() {
}

function client() {
const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);

Expand Down
12 changes: 6 additions & 6 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ resource's constructor.
```text
FSEVENTWRAP, FSREQWRAP, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPWRAP, TIMERWRAP, TTYWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVER, TCPWRAP, TIMERWRAP, TTYWRAP,
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
RANDOMBYTESREQUEST, TLSWRAP, Timeout, Immediate, TickObject
```
Expand Down Expand Up @@ -275,13 +275,13 @@ require('net').createServer((conn) => {}).listen(8080);
Output when hitting the server with `nc localhost 8080`:

```console
TCPWRAP(2): trigger: 1 execution: 1
TCPSERVERWRAP(2): trigger: 1 execution: 1
TCPWRAP(4): trigger: 2 execution: 0
```

The first `TCPWRAP` is the server which receives the connections.
The `TCPSERVERWRAP` is the server which receives the connections.

The second `TCPWRAP` is the new connection from the client. When a new
The `TCPWRAP` is the new connection from the client. When a new
connection is made the `TCPWrap` instance is immediately constructed. This
happens outside of any JavaScript stack (side note: a `executionAsyncId()` of `0`
means it's being executed from C++, with no JavaScript stack above it).
Expand Down Expand Up @@ -354,7 +354,7 @@ require('net').createServer(() => {}).listen(8080, () => {
Output from only starting the server:

```console
TCPWRAP(2): trigger: 1 execution: 1
TCPSERVERWRAP(2): trigger: 1 execution: 1
TickObject(3): trigger: 2 execution: 1
before: 3
Timeout(4): trigger: 3 execution: 3
Expand Down Expand Up @@ -387,7 +387,7 @@ Only using `execution` to graph resource allocation results in the following:
TTYWRAP(6) -> Timeout(4) -> TIMERWRAP(5) -> TickObject(3) -> root(1)
```

The `TCPWRAP` is not part of this graph, even though it was the reason for
The `TCPSERVERWRAP` is not part of this graph, even though it was the reason for
`console.log()` being called. This is because binding to a port without a
hostname is a *synchronous* operation, but to maintain a completely asynchronous
API the user's callback is placed in a `process.nextTick()`.
Expand Down
8 changes: 5 additions & 3 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const { Buffer } = require('buffer');
const debug = util.debuglog('tls');
const { Timer } = process.binding('timer_wrap');
const tls_wrap = process.binding('tls_wrap');
const { TCP } = process.binding('tcp_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const errors = require('internal/errors');
const kConnectOptions = Symbol('connect-options');
const kDisableRenegotiation = Symbol('disable-renegotiation');
Expand Down Expand Up @@ -398,7 +398,9 @@ TLSSocket.prototype._wrapHandle = function(wrap) {

var options = this._tlsOptions;
if (!handle) {
handle = options.pipe ? new Pipe() : new TCP();
handle = options.pipe ?
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
handle.owner = this;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const { createPromise,
promiseResolve, promiseReject } = process.binding('util');
const debug = util.debuglog('child_process');
const { Buffer } = require('buffer');
const { Pipe } = process.binding('pipe_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { errname } = process.binding('uv');
const child_process = require('internal/child_process');
const {
Expand Down Expand Up @@ -103,7 +103,7 @@ exports.fork = function(modulePath /*, args, options*/) {

exports._forkChild = function(fd) {
// set process.send()
var p = new Pipe(true);
var p = new Pipe(PipeConstants.IPC);
p.open(fd);
p.unref();
const control = setupChannel(process, p);
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const assert = require('assert');

const { Process } = process.binding('process_wrap');
const { WriteWrap } = process.binding('stream_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TTY } = process.binding('tty_wrap');
const { TCP } = process.binding('tcp_wrap');
const { UDP } = process.binding('udp_wrap');
Expand Down Expand Up @@ -863,7 +863,7 @@ function _validateStdio(stdio, sync) {
};

if (!sync)
a.handle = new Pipe();
a.handle = new Pipe(PipeConstants.SOCKET);

acc.push(a);
} else if (stdio === 'ipc') {
Expand All @@ -876,7 +876,7 @@ function _validateStdio(stdio, sync) {
throw new errors.Error('ERR_IPC_SYNC_FORK');
}

ipc = new Pipe(true);
ipc = new Pipe(PipeConstants.IPC);
ipcFd = i;

acc.push({
Expand Down
32 changes: 22 additions & 10 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const {

const { Buffer } = require('buffer');
const TTYWrap = process.binding('tty_wrap');
const { TCP } = process.binding('tcp_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TCPConnectWrap } = process.binding('tcp_wrap');
const { PipeConnectWrap } = process.binding('pipe_wrap');
const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
Expand All @@ -57,10 +57,20 @@ const exceptionWithHostPort = util._exceptionWithHostPort;

function noop() {}

function createHandle(fd) {
function createHandle(fd, is_server) {
const type = TTYWrap.guessHandleType(fd);
if (type === 'PIPE') return new Pipe();
if (type === 'TCP') return new TCP();
if (type === 'PIPE') {
return new Pipe(
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
);
}

if (type === 'TCP') {
return new TCP(
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
);
}

throw new errors.TypeError('ERR_INVALID_FD_TYPE', type);
}

Expand Down Expand Up @@ -200,7 +210,7 @@ function Socket(options) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
} else if (options.fd !== undefined) {
this._handle = createHandle(options.fd);
this._handle = createHandle(options.fd, false);
this._handle.open(options.fd);
this[async_id_symbol] = this._handle.getAsyncId();
// options.fd can be string (since it is user-defined),
Expand Down Expand Up @@ -1009,7 +1019,9 @@ Socket.prototype.connect = function(...args) {
debug('pipe', pipe, path);

if (!this._handle) {
this._handle = pipe ? new Pipe() : new TCP();
this._handle = pipe ?
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
initSocketHandle(this);
}

Expand Down Expand Up @@ -1269,7 +1281,7 @@ function createServerHandle(address, port, addressType, fd) {
var isTCP = false;
if (typeof fd === 'number' && fd >= 0) {
try {
handle = createHandle(fd);
handle = createHandle(fd, true);
} catch (e) {
// Not a fd we can listen on. This will trigger an error.
debug('listen invalid fd=%d:', fd, e.message);
Expand All @@ -1280,15 +1292,15 @@ function createServerHandle(address, port, addressType, fd) {
handle.writable = true;
assert(!address && !port);
} else if (port === -1 && addressType === -1) {
handle = new Pipe();
handle = new Pipe(PipeConstants.SERVER);
if (process.platform === 'win32') {
var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
if (!isNaN(instances)) {
handle.setPendingInstances(instances);
}
}
} else {
handle = new TCP();
handle = new TCP(TCPConstants.SERVER);
isTCP = true;
}

Expand Down
2 changes: 2 additions & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace node {
V(HTTPPARSER) \
V(JSSTREAM) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(PROMISE) \
Expand All @@ -54,6 +55,7 @@ namespace node {
V(SIGNALWRAP) \
V(STATWATCHER) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
V(TIMERWRAP) \
V(TTYWRAP) \
Expand Down
4 changes: 3 additions & 1 deletion src/connection_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
if (status == 0) {
env->set_init_trigger_async_id(wrap_data->get_async_id());
// Instantiate the client javascript object and handle.
Local<Object> client_obj = WrapType::Instantiate(env, wrap_data);
Local<Object> client_obj = WrapType::Instantiate(env,
wrap_data,
WrapType::SOCKET);

// Unwrap the client javascript object.
WrapType* wrap;
Expand Down
48 changes: 42 additions & 6 deletions src/pipe_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Int32;
using v8::Local;
using v8::Object;
using v8::String;
Expand All @@ -48,14 +49,17 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;


Local<Object> PipeWrap::Instantiate(Environment* env, AsyncWrap* parent) {
Local<Object> PipeWrap::Instantiate(Environment* env,
AsyncWrap* parent,
PipeWrap::SocketType type) {
EscapableHandleScope handle_scope(env->isolate());
AsyncHooks::InitScope init_scope(env, parent->get_async_id());
CHECK_EQ(false, env->pipe_constructor_template().IsEmpty());
Local<Function> constructor = env->pipe_constructor_template()->GetFunction();
CHECK_EQ(false, constructor.IsEmpty());
Local<Value> type_value = Int32::New(env->isolate(), type);
Local<Object> instance =
constructor->NewInstance(env->context()).ToLocalChecked();
constructor->NewInstance(env->context(), 1, &type_value).ToLocalChecked();
return handle_scope.Escape(instance);
}

Expand Down Expand Up @@ -107,6 +111,15 @@ void PipeWrap::Initialize(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "PipeConnectWrap");
cwt->SetClassName(wrapString);
target->Set(wrapString, cwt->GetFunction());

// Define constants
Local<Object> constants = Object::New(env->isolate());
NODE_DEFINE_CONSTANT(constants, SOCKET);
NODE_DEFINE_CONSTANT(constants, SERVER);
NODE_DEFINE_CONSTANT(constants, IPC);
target->Set(context,
FIXED_ONE_BYTE_STRING(env->isolate(), "constants"),
constants).FromJust();
}


Expand All @@ -115,17 +128,40 @@ void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
// Therefore we assert that we are not trying to call this as a
// normal function.
CHECK(args.IsConstructCall());
CHECK(args[0]->IsInt32());
Environment* env = Environment::GetCurrent(args);
new PipeWrap(env, args.This(), args[0]->IsTrue());

int type_value = args[0].As<Int32>()->Value();
PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);

bool ipc;
ProviderType provider;
switch (type) {
case SOCKET:
provider = PROVIDER_PIPEWRAP;
ipc = false;
break;
case SERVER:
provider = PROVIDER_PIPESERVERWRAP;
ipc = false;
break;
case IPC:
provider = PROVIDER_PIPEWRAP;
ipc = true;
break;
default:
UNREACHABLE();
}

new PipeWrap(env, args.This(), provider, ipc);
}


PipeWrap::PipeWrap(Environment* env,
Local<Object> object,
ProviderType provider,
bool ipc)
: ConnectionWrap(env,
object,
AsyncWrap::PROVIDER_PIPEWRAP) {
: ConnectionWrap(env, object, provider) {
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
Expand Down
Loading