Skip to content

Commit

Permalink
src: refactor WriteWrap and ShutdownWraps
Browse files Browse the repository at this point in the history
Encapsulate stream requests more:

- `WriteWrap` and `ShutdownWrap` classes are now tailored to the
  streams on which they are used. In particular, for most streams
  these are now plain `AsyncWrap`s and do not carry the overhead
  of unused libuv request data.
- Provide generic `Write()` and `Shutdown()` methods that wrap
  around the actual implementations, and make *usage* of streams
  easier, rather than implementing; for example, wrap objects
  don’t need to be provided by callers anymore.
- Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to
  call the corresponding JS handlers, rather than always trying
  to call them. This makes usage of streams by other C++ code
  easier and leaner.

Also fix up some tests that were previously not actually testing
asynchronicity when the comments indicated that they would.

PR-URL: #18676
Reviewed-By: Ben Noordhuis <[email protected]>
Reviewed-By: Anatoli Papirovski <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax committed Feb 14, 2018
1 parent 0ed9ea8 commit 0e7b612
Show file tree
Hide file tree
Showing 20 changed files with 556 additions and 427 deletions.
2 changes: 1 addition & 1 deletion benchmark/net/tcp-raw-c2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ function client(type, len) {
fail(err, 'write');
}

function afterWrite(err, handle, req) {
function afterWrite(err, handle) {
if (err)
fail(err, 'write');

Expand Down
4 changes: 2 additions & 2 deletions benchmark/net/tcp-raw-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function main({ dur, len, type }) {
if (err)
fail(err, 'write');

writeReq.oncomplete = function(status, handle, req, err) {
writeReq.oncomplete = function(status, handle, err) {
if (err)
fail(err, 'write');
};
Expand Down Expand Up @@ -130,7 +130,7 @@ function main({ dur, len, type }) {
fail(err, 'write');
}

function afterWrite(err, handle, req) {
function afterWrite(err, handle) {
if (err)
fail(err, 'write');

Expand Down
8 changes: 4 additions & 4 deletions benchmark/net/tcp-raw-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ function main({ dur, len, type }) {
fail(err, 'write');
} else if (!writeReq.async) {
process.nextTick(function() {
afterWrite(null, clientHandle, writeReq);
afterWrite(0, clientHandle);
});
}
}

function afterWrite(status, handle, req, err) {
if (err)
fail(err, 'write');
function afterWrite(status, handle) {
if (status)
fail(status, 'write');

while (clientHandle.writeQueueSize === 0)
write();
Expand Down
9 changes: 4 additions & 5 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1399,20 +1399,19 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0;
}

function afterDoStreamWrite(status, handle, req) {
function afterDoStreamWrite(status, handle) {
const stream = handle[kOwner];
const session = stream[kSession];

stream[kUpdateTimer]();

const { bytes } = req;
const { bytes } = this;
stream[kState].writeQueueSize -= bytes;

if (session !== undefined)
session[kState].writeQueueSize -= bytes;
if (typeof req.callback === 'function')
req.callback(null);
req.handle = undefined;
if (typeof this.callback === 'function')
this.callback(null);
}

function streamOnResume() {
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/wrap_js_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class JSStreamWrap extends Socket {

const handle = this._handle;

this.stream.end(() => {
// Ensure that write was dispatched
setImmediate(() => {
setImmediate(() => {
// Ensure that write is dispatched asynchronously.
this.stream.end(() => {
this.finishShutdown(handle, 0);
});
});
Expand Down
14 changes: 7 additions & 7 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ function onSocketFinish() {
}


function afterShutdown(status, handle, req) {
function afterShutdown(status, handle) {
var self = handle.owner;

debug('afterShutdown destroyed=%j', self.destroyed,
Expand Down Expand Up @@ -869,12 +869,12 @@ protoGetter('bytesWritten', function bytesWritten() {
});


function afterWrite(status, handle, req, err) {
function afterWrite(status, handle, err) {
var self = handle.owner;
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);

if (req.async)
if (this.async)
self[kLastWriteQueueSize] = 0;

// callback may come after call to destroy.
Expand All @@ -884,9 +884,9 @@ function afterWrite(status, handle, req, err) {
}

if (status < 0) {
var ex = errnoException(status, 'write', req.error);
var ex = errnoException(status, 'write', this.error);
debug('write failure', ex);
self.destroy(ex, req.cb);
self.destroy(ex, this.cb);
return;
}

Expand All @@ -895,8 +895,8 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb');

if (req.cb)
req.cb.call(undefined);
if (this.cb)
this.cb.call(undefined);
}


Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class ModuleWrap;
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
V(shutdown_wrap_constructor_function, v8::Function) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(timers_callback_function, v8::Function) \
Expand Down
7 changes: 1 addition & 6 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};

req_wrap->Dispatched();

TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
Expand Down Expand Up @@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};

w->Dispatched();

TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
Expand All @@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {

template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w;
CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));

w->Done(args[1]->Int32Value());
}
Expand Down
44 changes: 10 additions & 34 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() {

chunks_sent_since_last_write_++;

// DoTryWrite may modify both the buffer list start itself and the
// base pointers/length of the individual buffers.
uv_buf_t* writebufs = *bufs;
if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
// All writes finished synchronously, nothing more to do here.
ClearOutgoing(0);
return;
}

WriteWrap* req = AllocateSend();
if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
req->Dispose();
StreamWriteResult res = underlying_stream()->Write(*bufs, count);
if (!res.async) {
ClearOutgoing(res.err);
}

DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
Expand Down Expand Up @@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
chunks_sent_since_last_write_ = n;
}

// Allocates the data buffer used to pass outbound data to the i/o stream.
WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
}

// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Scope h2scope(this);
Expand Down Expand Up @@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) {
DEBUG_HTTP2STREAM2(this, "closed with code %d", code);
}


inline void Http2Stream::Shutdown() {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}

int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
CHECK(!this->IsDestroyed());
req_wrap->Dispatched();
Shutdown();
{
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}
req_wrap->Done(0);
return 0;
}
Expand Down Expand Up @@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
CHECK_EQ(send_handle, nullptr);
Http2Scope h2scope(this);
session_->SetChunksSinceLastWrite();
req_wrap->Dispatched();
if (!IsWritable()) {
req_wrap->Done(UV_EOF);
return 0;
Expand Down
9 changes: 4 additions & 5 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,6 @@ class Http2Stream : public AsyncWrap,

inline void Close(int32_t code);

// Shutdown the writable side of the stream
inline void Shutdown();

// Destroy this stream instance and free all held memory.
inline void Destroy();

Expand Down Expand Up @@ -818,6 +815,10 @@ class Http2Session : public AsyncWrap, public StreamListener {

inline void EmitStatistics();

inline StreamBase* underlying_stream() {
return static_cast<StreamBase*>(stream_);
}

void Start();
void Stop();
void Close(uint32_t code = NGHTTP2_NO_ERROR,
Expand Down Expand Up @@ -907,8 +908,6 @@ class Http2Session : public AsyncWrap, public StreamListener {
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);

WriteWrap* AllocateSend();

uv_loop_t* event_loop() const {
return env()->event_loop();
}
Expand Down
5 changes: 5 additions & 0 deletions src/req_wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ void ReqWrap<T>::Dispatched() {
req_.data = this;
}

template <typename T>
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
return ContainerOf(&ReqWrap<T>::req_, req);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
2 changes: 2 additions & 0 deletions src/req_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
inline void Dispatched(); // Call this after the req has been dispatched.
T* req() { return &req_; }

static ReqWrap* from_req(T* req);

private:
friend class Environment;
friend int GenDebugSymbols();
Expand Down
Loading

0 comments on commit 0e7b612

Please sign in to comment.