Skip to content

Commit

Permalink
http2: refactor how trailers are done
Browse files Browse the repository at this point in the history
Rather than an option, introduce a method and an event...

```js
server.on('stream', (stream) => {
  stream.respond(undefined, { waitForTrailers: true });
  stream.on('wantTrailers', () => {
    stream.sendTrailers({ abc: 'xyz'});
  });
  stream.end('hello world');
});
```

This is a breaking change in the API such that the prior
`options.getTrailers` is no longer supported at all.
Ordinarily this would be semver-major and require a
deprecation but the http2 stuff is still experimental.

PR-URL: nodejs#19959
Reviewed-By: Yuta Hiroto <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
jasnell authored and kjin committed Aug 23, 2018
1 parent d2691d6 commit dbea72e
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 308 deletions.
13 changes: 13 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,19 @@ When setting the priority for an HTTP/2 stream, the stream may be marked as
a dependency for a parent stream. This error code is used when an attempt is
made to mark a stream and dependent of itself.

<a id="ERR_HTTP2_TRAILERS_ALREADY_SENT"></a>
### ERR_HTTP2_TRAILERS_ALREADY_SENT

Trailing headers have already been sent on the `Http2Stream`.

<a id="ERR_HTTP2_TRAILERS_NOT_READY"></a>
### ERR_HTTP2_TRAILERS_NOT_READY

The `http2stream.sendTrailers()` method cannot be called until after the
`'wantTrailers'` event is emitted on an `Http2Stream` object. The
`'wantTrailers'` event will only be emitted if the `waitForTrailers` option
is set for the `Http2Stream`.

<a id="ERR_HTTP2_UNSUPPORTED_PROTOCOL"></a>
### ERR_HTTP2_UNSUPPORTED_PROTOCOL

Expand Down
198 changes: 117 additions & 81 deletions doc/api/http2.md

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,12 @@ E('ERR_HTTP2_STATUS_INVALID', 'Invalid status code: %s');
E('ERR_HTTP2_STREAM_CANCEL', 'The pending stream has been canceled');
E('ERR_HTTP2_STREAM_ERROR', 'Stream closed with error code %s');
E('ERR_HTTP2_STREAM_SELF_DEPENDENCY', 'A stream cannot depend on itself');
E('ERR_HTTP2_UNSUPPORTED_PROTOCOL',
(protocol) => `protocol "${protocol}" is unsupported.`);
E('ERR_HTTP2_TRAILERS_ALREADY_SENT',
'Trailing headers have already been sent');
E('ERR_HTTP2_TRAILERS_NOT_READY',
'Trailing headers cannot be sent until after the wantTrailers event is ' +
'emitted');
E('ERR_HTTP2_UNSUPPORTED_PROTOCOL', 'protocol "%s" is unsupported.');
E('ERR_HTTP_HEADERS_SENT',
'Cannot render headers after they are sent to the client');
E('ERR_HTTP_INVALID_CHAR', 'Invalid character in statusMessage.');
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/http2/compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ class Http2ServerRequest extends Readable {
}
}

function onStreamTrailersReady() {
this[kStream].sendTrailers(this[kTrailers]);
}

class Http2ServerResponse extends Stream {
constructor(stream, options) {
super(options);
Expand All @@ -363,6 +367,7 @@ class Http2ServerResponse extends Stream {
stream.on('drain', onStreamDrain);
stream.on('aborted', onStreamAbortedResponse);
stream.on('close', this[kFinish].bind(this));
stream.on('wantTrailers', onStreamTrailersReady.bind(this));
}

// User land modules such as finalhandler just check truthiness of this
Expand Down Expand Up @@ -632,7 +637,7 @@ class Http2ServerResponse extends Stream {
headers[HTTP2_HEADER_STATUS] = state.statusCode;
const options = {
endStream: state.ending,
getTrailers: (trailers) => Object.assign(trailers, this[kTrailers])
waitForTrailers: true,
};
this[kStream].respond(headers, options);
}
Expand Down
91 changes: 41 additions & 50 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,25 +245,18 @@ function tryClose(fd) {
fs.close(fd, (err) => assert.ifError(err));
}

// Called to determine if there are trailers to be sent at the end of a
// Stream. The 'getTrailers' callback is invoked and passed a holder object.
// The trailers to return are set on that object by the handler. Once the
// event handler returns, those are sent off for processing. Note that this
// is a necessarily synchronous operation. We need to know immediately if
// there are trailing headers to send.
// Called when the Http2Stream has finished sending data and is ready for
// trailers to be sent. This will only be called if the { hasOptions: true }
// option is set.
function onStreamTrailers() {
const stream = this[kOwner];
stream[kState].trailersReady = true;
if (stream.destroyed)
return [];
const trailers = Object.create(null);
stream[kState].getTrailers.call(stream, trailers);
const headersList = mapToHeaders(trailers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList)) {
stream.destroy(headersList);
return [];
return;
if (!stream.emit('wantTrailers')) {
// There are no listeners, send empty trailing HEADERS frame and close.
stream.sendTrailers({});
}
stream[kSentTrailers] = trailers;
return headersList;
}

// Submit an RST-STREAM frame to be sent to the remote peer.
Expand Down Expand Up @@ -479,10 +472,8 @@ function requestOnConnect(headers, options) {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (typeof options.getTrailers === 'function') {
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

// ret will be either the reserved stream ID (if positive)
// or an error code (if negative)
Expand Down Expand Up @@ -1367,13 +1358,6 @@ class ClientHttp2Session extends Http2Session {
options.endStream);
}

if (options.getTrailers !== undefined &&
typeof options.getTrailers !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'getTrailers',
options.getTrailers);
}

const headersList = mapToHeaders(headers);
if (!Array.isArray(headersList))
throw headersList;
Expand Down Expand Up @@ -1486,7 +1470,8 @@ class Http2Stream extends Duplex {
this[kState] = {
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0
writeQueueSize: 0,
trailersReady: false
};

this.on('resume', streamOnResume);
Expand Down Expand Up @@ -1742,6 +1727,33 @@ class Http2Stream extends Duplex {
priorityFn();
}

sendTrailers(headers) {
if (this.destroyed || this.closed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
if (this[kSentTrailers])
throw new errors.Error('ERR_HTTP2_TRAILERS_ALREADY_SENT');
if (!this[kState].trailersReady)
throw new errors.Error('ERR_HTTP2_TRAILERS_NOT_READY');

assertIsObject(headers, 'headers');
headers = Object.assign(Object.create(null), headers);

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: sending trailers`);

this[kUpdateTimer]();

const headersList = mapToHeaders(headers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList))
throw headersList;
this[kSentTrailers] = headers;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
}

get closed() {
return !!(this[kState].flags & STREAM_FLAGS_CLOSED);
}
Expand Down Expand Up @@ -2169,15 +2181,8 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'getTrailers',
options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.getTrailers = options.getTrailers;
}

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2243,15 +2248,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'getTrailers',
options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

if (typeof fd !== 'number')
throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
Expand Down Expand Up @@ -2317,15 +2315,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'getTrailers',
options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
Expand Down
107 changes: 40 additions & 67 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1117,36 +1117,6 @@ inline int Http2Session::OnNghttpError(nghttp2_session* handle,
return 0;
}

// Once all of the DATA frames for a Stream have been sent, the GetTrailers
// method calls out to JavaScript to fetch the trailing headers that need
// to be sent.
inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) {
if (!stream->IsDestroyed() && stream->HasTrailers()) {
Http2Stream::SubmitTrailers submit_trailers{this, stream, flags};
stream->OnTrailers(submit_trailers);
}
}


Http2Stream::SubmitTrailers::SubmitTrailers(
Http2Session* session,
Http2Stream* stream,
uint32_t* flags)
: session_(session), stream_(stream), flags_(flags) { }


inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
Http2Scope h2scope(session_);
if (length == 0)
return;
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
stream_->id(), length);
*flags_ |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
CHECK_EQ(
nghttp2_submit_trailer(**session_, stream_->id(), trailers, length), 0);
}


// Called by OnFrameReceived to notify JavaScript land that a complete
// HEADERS frame has been received and processed. This method converts the
Expand Down Expand Up @@ -1808,29 +1778,6 @@ nghttp2_stream* Http2Stream::operator*() {
}


// Calls out to JavaScript land to fetch the actual trailer headers to send
// for this stream.
void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
DEBUG_HTTP2STREAM(this, "prompting for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);

Local<Value> ret =
MakeCallback(env()->ontrailers_string(), 0, nullptr).ToLocalChecked();
if (!ret.IsEmpty() && !IsDestroyed()) {
if (ret->IsArray()) {
Local<Array> headers = ret.As<Array>();
if (headers->Length() > 0) {
Headers trailers(isolate, context, headers);
submit_trailers.Submit(*trailers, trailers.length());
}
}
}
}

inline void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
Expand Down Expand Up @@ -1952,6 +1899,26 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
return ret;
}

void Http2Stream::OnTrailers() {
DEBUG_HTTP2STREAM(this, "let javascript know we are ready for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);
MakeCallback(env()->ontrailers_string(), 0, nullptr);
}

// Submit informational headers for a stream.
int Http2Stream::SubmitTrailers(nghttp2_nv* nva, size_t len) {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d trailers", len);
int ret = nghttp2_submit_trailer(**session_, id_, nva, len);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
return ret;
}

// Submit a PRIORITY frame to the connected peer.
inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
Expand Down Expand Up @@ -2184,13 +2151,6 @@ ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle,
if (static_cast<size_t>(numchars) < length || length <= 0) {
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
session->GetTrailers(stream, flags);
// If the stream or session gets destroyed during the GetTrailers
// callback, check that here and close down the stream
if (stream->IsDestroyed())
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if (session->IsDestroyed())
return NGHTTP2_ERR_CALLBACK_FAILURE;
}

stream->statistics_.sent_bytes += numchars;
Expand Down Expand Up @@ -2258,13 +2218,10 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
if (stream->queue_.empty() && !stream->IsWritable()) {
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
session->GetTrailers(stream, flags);
// If the stream or session gets destroyed during the GetTrailers
// callback, check that here and close down the stream
if (stream->IsDestroyed())
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if (session->IsDestroyed())
return NGHTTP2_ERR_CALLBACK_FAILURE;
if (stream->HasTrailers()) {
*flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
stream->OnTrailers();
}
}

stream->statistics_.sent_bytes += amount;
Expand Down Expand Up @@ -2574,6 +2531,21 @@ void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
headers->Length());
}

// Submits trailing headers on the Http2Stream
void Http2Stream::Trailers(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
Isolate* isolate = env->isolate();
Http2Stream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());

Local<Array> headers = args[0].As<Array>();

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitTrailers(*list, list.length()));
DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", headers->Length());
}

// Grab the numeric id of the Http2Stream
void Http2Stream::GetID(const FunctionCallbackInfo<Value>& args) {
Http2Stream* stream;
Expand Down Expand Up @@ -2921,6 +2893,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
env->SetProtoMethod(stream, "info", Http2Stream::Info);
env->SetProtoMethod(stream, "trailers", Http2Stream::Trailers);
env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD);
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
Expand Down
Loading

0 comments on commit dbea72e

Please sign in to comment.