Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src,http2: use native-layer pipe mechanism from FileHandle instead of synchronous I/O #18936

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion benchmark/http2/respond-with-fd.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const fs = require('fs');
const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html');

const bench = common.createBenchmark(main, {
requests: [100, 1000, 10000, 100000, 1000000],
requests: [100, 1000, 10000, 100000],
streams: [100, 200, 1000],
clients: [1, 2],
benchmarker: ['h2load']
Expand Down
9 changes: 8 additions & 1 deletion doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,14 @@ client.
### ERR_HTTP2_SEND_FILE

An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to
send something other than a regular file.
send a directory.

<a id="ERR_HTTP2_SEND_FILE_NOSEEK"></a>
### ERR_HTTP2_SEND_FILE_NOSEEK

An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to
send something other than a regular file, but `offset` or `length` options were
provided.

<a id="ERR_HTTP2_SESSION_ERROR"></a>
### ERR_HTTP2_SESSION_ERROR
Expand Down
10 changes: 10 additions & 0 deletions doc/api/http2.md
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,11 @@ if the `getTrailers` callback attempts to set such header fields.
#### http2stream.respondWithFD(fd[, headers[, options]])
<!-- YAML
added: v8.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18936
description: Any readable file descriptor, not necessarily for a
regular file, is supported now.
-->

* `fd` {number} A readable file descriptor.
Expand Down Expand Up @@ -1313,6 +1318,11 @@ if the `getTrailers` callback attempts to set such header fields.
#### http2stream.respondWithFile(path[, headers[, options]])
<!-- YAML
added: v8.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18936
description: Any readable file, not necessarily a
regular file, is supported now.
-->

* `path` {string|Buffer|URL}
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,9 @@ E('ERR_HTTP2_PING_LENGTH', 'HTTP2 ping payload must be 8 bytes', RangeError);
E('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED',
'Cannot set HTTP/2 pseudo-headers', Error);
E('ERR_HTTP2_PUSH_DISABLED', 'HTTP/2 client has disabled push streams', Error);
E('ERR_HTTP2_SEND_FILE', 'Only regular files can be sent', Error);
E('ERR_HTTP2_SEND_FILE', 'Directories cannot be sent', Error);
E('ERR_HTTP2_SEND_FILE_NOSEEK',
'Offset or length can only be specified for regular files', Error);
E('ERR_HTTP2_SESSION_ERROR', 'Session closed with error code %s', Error);
E('ERR_HTTP2_SOCKET_BOUND',
'The socket is already bound to an Http2Session', Error);
Expand Down
94 changes: 72 additions & 22 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

require('internal/util').assertCrypto();

const { internalBinding } = require('internal/bootstrap_loaders');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const { UV_EOF } = process.binding('uv');
const http = require('http');
const binding = process.binding('http2');
const { FileHandle } = process.binding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const assert = require('assert');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
Expand Down Expand Up @@ -38,6 +42,7 @@ const {
ERR_HTTP2_PING_LENGTH,
ERR_HTTP2_PUSH_DISABLED,
ERR_HTTP2_SEND_FILE,
ERR_HTTP2_SEND_FILE_NOSEEK,
ERR_HTTP2_SESSION_ERROR,
ERR_HTTP2_SETTINGS_CANCEL,
ERR_HTTP2_SOCKET_BOUND,
Expand Down Expand Up @@ -65,6 +70,7 @@ const { onServerStream,
const { utcDate } = require('internal/http');
const { promisify } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { _connectionListener: httpConnectionListener } = require('http');
const { createPromise, promiseResolve } = process.binding('util');
const debug = util.debuglog('http2');
Expand Down Expand Up @@ -345,9 +351,7 @@ function onStreamClose(code) {
stream.end();
}

if (state.fd !== undefined)
tryClose(state.fd);

state.fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
Expand Down Expand Up @@ -1928,6 +1932,26 @@ function processHeaders(headers) {
return headers;
}

function onFileCloseError(stream, err) {
stream.emit(err);
}

function onFileUnpipe() {
const stream = this.sink[kOwner];
if (stream.ownsFd)
this.source.close().catch(onFileCloseError.bind(stream));
else
this.source.releaseFD();
}

// This is only called once the pipe has returned back control, so
// it only has to handle errors and End-of-File.
function onPipedFileHandleRead(err) {
if (err < 0 && err !== UV_EOF) {
this.stream.close(NGHTTP2_INTERNAL_ERROR);
}
}

function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
streamOptions = 0) {
const state = self[kState];
Expand All @@ -1940,18 +1964,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
return;
}


// Close the writable side of the stream
// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
self._final = null;
self.end();

const ret = self[kHandle].respondFD(fd, headersList,
offset, length,
streamOptions);
const ret = self[kHandle].respond(headersList, streamOptions);

if (ret < 0) {
self.destroy(new NghttpError(ret));
return;
}

defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe,
self, fd, offset, length);
}

function startFilePipe(self, fd, offset, length) {
const handle = new FileHandle(fd, offset, length);
handle.onread = onPipedFileHandleRead;
handle.stream = self;

const pipe = new StreamPipe(handle._externalStream,
self[kHandle]._externalStream);
pipe.onunpipe = onFileUnpipe;
pipe.start();

// exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
Expand Down Expand Up @@ -2008,12 +2046,21 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) {
}

if (!stat.isFile()) {
const err = new ERR_HTTP2_SEND_FILE();
if (onError)
onError(err);
else
this.destroy(err);
return;
const isDirectory = stat.isDirectory();
if (options.offset !== undefined || options.offset > 0 ||
options.length !== undefined || options.length >= 0 ||
isDirectory) {
const err = isDirectory ?
new ERR_HTTP2_SEND_FILE() : new ERR_HTTP2_SEND_FILE_NOSEEK();
if (onError)
onError(err);
else
this.destroy(err);
return;
}

options.offset = -1;
options.length = -1;
}

if (this.destroyed || this.closed) {
Expand All @@ -2038,12 +2085,14 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) {
return;
}

statOptions.length =
statOptions.length < 0 ? stat.size - (+statOptions.offset) :
Math.min(stat.size - (+statOptions.offset),
statOptions.length);
if (stat.isFile()) {
statOptions.length =
statOptions.length < 0 ? stat.size - (+statOptions.offset) :
Math.min(stat.size - (+statOptions.offset),
statOptions.length);

headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length;
headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length;
}

processRespondWithFD(this, fd, headers,
options.offset | 0,
Expand Down Expand Up @@ -2270,8 +2319,9 @@ class ServerHttp2Stream extends Http2Stream {
throw new ERR_INVALID_ARG_TYPE('fd', 'number');

debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
`${sessionName(session[kType])}]: initiating response from fd`);
this[kUpdateTimer]();
this.ownsFd = false;

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2333,9 +2383,9 @@ class ServerHttp2Stream extends Http2Stream {

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
`${sessionName(session[kType])}]: initiating response from file`);
this[kUpdateTimer]();

this.ownsFd = true;

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@
'src/string_decoder.cc',
'src/string_search.cc',
'src/stream_base.cc',
'src/stream_pipe.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
'src/timer_wrap.cc',
Expand Down Expand Up @@ -394,6 +395,7 @@
'src/string_decoder-inl.h',
'src/stream_base.h',
'src/stream_base-inl.h',
'src/stream_pipe.h',
'src/stream_wrap.h',
'src/tracing/agent.h',
'src/tracing/node_trace_buffer.h',
Expand Down
16 changes: 16 additions & 0 deletions src/async_wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ inline double AsyncWrap::get_trigger_async_id() const {
}


inline AsyncWrap::AsyncScope::AsyncScope(AsyncWrap* wrap)
: wrap_(wrap) {
Environment* env = wrap->env();
if (env->async_hooks()->fields()[Environment::AsyncHooks::kBefore] == 0)
return;
EmitBefore(env, wrap->get_async_id());
}

inline AsyncWrap::AsyncScope::~AsyncScope() {
Environment* env = wrap_->env();
if (env->async_hooks()->fields()[Environment::AsyncHooks::kAfter] == 0)
return;
EmitAfter(env, wrap_->get_async_id());
}


inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
const v8::Local<v8::String> symbol,
int argc,
Expand Down
13 changes: 13 additions & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace node {
V(SHUTDOWNWRAP) \
V(SIGNALWRAP) \
V(STATWATCHER) \
V(STREAMPIPE) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
Expand Down Expand Up @@ -169,6 +170,18 @@ class AsyncWrap : public BaseObject {

static void WeakCallback(const v8::WeakCallbackInfo<DestroyParam> &info);

// This is a simplified version of InternalCallbackScope that only runs
// the `before` and `after` hooks. Only use it when not actually calling
// back into JS; otherwise, use InternalCallbackScope.
class AsyncScope {
public:
explicit inline AsyncScope(AsyncWrap* wrap);
~AsyncScope();

private:
AsyncWrap* wrap_ = nullptr;
};

private:
friend class PromiseWrap;

Expand Down
13 changes: 13 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ inline void Environment::set_http_parser_buffer(char* buffer) {
http_parser_buffer_ = buffer;
}

inline bool Environment::http_parser_buffer_in_use() const {
return http_parser_buffer_in_use_;
}

inline void Environment::set_http_parser_buffer_in_use(bool in_use) {
http_parser_buffer_in_use_ = in_use;
}

inline http2::http2_state* Environment::http2_state() const {
return http2_state_.get();
}
Expand All @@ -484,6 +492,11 @@ Environment::fs_stats_field_array() {
return &fs_stats_field_array_;
}

inline std::vector<std::unique_ptr<fs::FileHandleReadWrap>>&
Environment::file_handle_read_wrap_freelist() {
return file_handle_read_wrap_freelist_;
}

void Environment::CreateImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj,
Expand Down
1 change: 1 addition & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "async_wrap.h"
#include "node_buffer.h"
#include "node_platform.h"
#include "node_file.h"

#include <stdio.h>
#include <algorithm>
Expand Down
Loading