diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 2a4ef0421f7c84..607ae3fd2d297d 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -4,9 +4,13 @@ require('internal/util').assertCrypto(); +const { internalBinding } = require('internal/bootstrap_loaders'); const { async_id_symbol } = require('internal/async_hooks').symbols; +const { UV_EOF } = process.binding('uv'); const http = require('http'); const binding = process.binding('http2'); +const { FileHandle } = process.binding('fs'); +const { StreamPipe } = internalBinding('stream_pipe'); const assert = require('assert'); const { Buffer } = require('buffer'); const EventEmitter = require('events'); @@ -65,6 +69,7 @@ const { onServerStream, const { utcDate } = require('internal/http'); const { promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); +const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { _connectionListener: httpConnectionListener } = require('http'); const { createPromise, promiseResolve } = process.binding('util'); const debug = util.debuglog('http2'); @@ -345,9 +350,7 @@ function onStreamClose(code) { stream.end(); } - if (state.fd !== undefined) - tryClose(state.fd); - + state.fd = -1; // Defer destroy we actually emit end. if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. @@ -1928,6 +1931,26 @@ function processHeaders(headers) { return headers; } +function onFileCloseError(stream, err) { + stream.emit(err); +} + +function onFileUnpipe() { + const stream = this.sink[kOwner]; + if (stream.ownsFd) + this.source.close().catch(onFileCloseError.bind(stream)); + else + this.source.releaseFD(); +} + +// This is only called once the pipe has returned back control, so +// it only has to handle errors and End-of-File. +function onPipedFileHandleRead(err) { + if (err < 0 && err !== UV_EOF) { + this.stream.close(NGHTTP2_INTERNAL_ERROR); + } +} + function processRespondWithFD(self, fd, headers, offset = 0, length = -1, streamOptions = 0) { const state = self[kState]; @@ -1940,18 +1963,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1, return; } - - // Close the writable side of the stream + // Close the writable side of the stream, but only as far as the writable + // stream implementation is concerned. + self._final = null; self.end(); - const ret = self[kHandle].respondFD(fd, headersList, - offset, length, - streamOptions); + const ret = self[kHandle].respond(headersList, streamOptions); if (ret < 0) { self.destroy(new NghttpError(ret)); return; } + + defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe, + self, fd, offset, length); +} + +function startFilePipe(self, fd, offset, length) { + const handle = new FileHandle(fd, offset, length); + handle.onread = onPipedFileHandleRead; + handle.stream = self; + + const pipe = new StreamPipe(handle._externalStream, + self[kHandle]._externalStream); + pipe.onunpipe = onFileUnpipe; + pipe.start(); + // exact length of the file doesn't matter here, since the // stream is closing anyway - just use 1 to signify that // a write does exist @@ -2270,8 +2307,9 @@ class ServerHttp2Stream extends Http2Stream { throw new ERR_INVALID_ARG_TYPE('fd', 'number'); debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from fd`); this[kUpdateTimer](); + this.ownsFd = false; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; @@ -2333,9 +2371,9 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from file`); this[kUpdateTimer](); - + this.ownsFd = true; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; diff --git a/src/node_http2.cc b/src/node_http2.cc index d6df93cf3804a7..68a684025ce4a0 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1888,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, } -// Initiate a response that contains data read from a file descriptor. -inline int Http2Stream::SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options) { - CHECK(!this->IsDestroyed()); - Http2Scope h2scope(this); - DEBUG_HTTP2STREAM(this, "submitting file"); - if (options & STREAM_OPTION_GET_TRAILERS) - flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; - - if (offset > 0) fd_offset_ = offset; - if (length > -1) fd_length_ = length; - - Http2Stream::Provider::FD prov(this, options, fd); - int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov); - CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - return ret; -} - - // Submit informational headers for a stream. inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { CHECK(!this->IsDestroyed()); @@ -2085,87 +2063,6 @@ Http2Stream::Provider::~Provider() { provider_.source.ptr = nullptr; } -// The FD Provider pulls data from a file descriptor using libuv. All of the -// data transfer occurs in C++, without any chunks being passed through JS -// land. -Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd) - : Http2Stream::Provider(stream, options) { - CHECK(!stream->IsDestroyed()); - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -Http2Stream::Provider::FD::FD(int options, int fd) - : Http2Stream::Provider(options) { - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data) { - Http2Session* session = static_cast(user_data); - Http2Stream* stream = session->FindStream(id); - if (stream->statistics_.first_byte_sent == 0) - stream->statistics_.first_byte_sent = uv_hrtime(); - - DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id); - CHECK_EQ(id, stream->id()); - - int fd = source->fd; - int64_t offset = stream->fd_offset_; - ssize_t numchars = 0; - - if (stream->fd_length_ >= 0 && - stream->fd_length_ < static_cast(length)) - length = stream->fd_length_; - - uv_buf_t data; - data.base = reinterpret_cast(buf); - data.len = length; - - uv_fs_t read_req; - - if (length > 0) { - // TODO(addaleax): Never use synchronous I/O on the main thread. - numchars = uv_fs_read(session->event_loop(), - &read_req, - fd, &data, 1, - offset, nullptr); - uv_fs_req_cleanup(&read_req); - } - - // Close the stream with an error if reading fails - if (numchars < 0) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - - // Update the read offset for the next read - stream->fd_offset_ += numchars; - stream->fd_length_ -= numchars; - - DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars); - - // if numchars < length, assume that we are done. - if (static_cast(numchars) < length || length <= 0) { - DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id); - *flags |= NGHTTP2_DATA_FLAG_EOF; - session->GetTrailers(stream, flags); - // If the stream or session gets destroyed during the GetTrailers - // callback, check that here and close down the stream - if (stream->IsDestroyed()) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - if (session->IsDestroyed()) - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - - stream->statistics_.sent_bytes += numchars; - return numchars; -} - // The Stream Provider pulls data from a linked list of uv_buf_t structs // built via the StreamBase API and the Streams js API. Http2Stream::Provider::Stream::Stream(int options) @@ -2508,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo& args) { DEBUG_HTTP2STREAM(stream, "response submitted"); } -// Initiates a response on the Http2Stream using a file descriptor to provide -// outbound DATA frames. -void Http2Stream::RespondFD(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Local context = env->context(); - Isolate* isolate = env->isolate(); - Http2Stream* stream; - ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); - - int fd = args[0]->Int32Value(context).ToChecked(); - Local headers = args[1].As(); - - int64_t offset = args[2]->IntegerValue(context).ToChecked(); - int64_t length = args[3]->IntegerValue(context).ToChecked(); - int options = args[4]->IntegerValue(context).ToChecked(); - - Headers list(isolate, context, headers); - args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(), - offset, length, options)); - DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd); -} // Submits informational headers on the Http2Stream void Http2Stream::Info(const FunctionCallbackInfo& args) { @@ -2891,7 +2767,6 @@ void Initialize(Local target, env->SetProtoMethod(stream, "priority", Http2Stream::Priority); env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise); env->SetProtoMethod(stream, "info", Http2Stream::Info); - env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD); env->SetProtoMethod(stream, "respond", Http2Stream::Respond); env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream); env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState); diff --git a/src/node_http2.h b/src/node_http2.h index 2d55989fd7d2e7..0fac6cca00f4db 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -580,13 +580,6 @@ class Http2Stream : public AsyncWrap, size_t len, int options); - // Send data read from a file descriptor as the response on this stream. - inline int SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options); - // Submit informational headers for this stream inline int SubmitInfo(nghttp2_nv* nva, size_t len); @@ -709,7 +702,6 @@ class Http2Stream : public AsyncWrap, static void PushPromise(const FunctionCallbackInfo& args); static void RefreshState(const FunctionCallbackInfo& args); static void Info(const FunctionCallbackInfo& args); - static void RespondFD(const FunctionCallbackInfo& args); static void Respond(const FunctionCallbackInfo& args); static void RstStream(const FunctionCallbackInfo& args); @@ -753,8 +745,6 @@ class Http2Stream : public AsyncWrap, // waiting to be written out to the socket. std::queue queue_; size_t available_outbound_length_ = 0; - int64_t fd_offset_ = 0; - int64_t fd_length_ = -1; Http2StreamListener stream_listener_; @@ -780,20 +770,6 @@ class Http2Stream::Provider { bool empty_ = false; }; -class Http2Stream::Provider::FD : public Http2Stream::Provider { - public: - FD(int options, int fd); - FD(Http2Stream* stream, int options, int fd); - - static ssize_t OnRead(nghttp2_session* session, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data); -}; - class Http2Stream::Provider::Stream : public Http2Stream::Provider { public: Stream(Http2Stream* stream, int options); diff --git a/test/parallel/test-http2-respond-with-fd-errors.js b/test/parallel/test-http2-respond-with-fd-errors.js index 0eccd231c63a2e..3a671a3e36490a 100644 --- a/test/parallel/test-http2-respond-with-fd-errors.js +++ b/test/parallel/test-http2-respond-with-fd-errors.js @@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests); let currentError; -// mock respondFD because we only care about testing error handling -Http2Stream.prototype.respondFD = () => currentError.ngError; +// mock `respond` because we only care about testing error handling +Http2Stream.prototype.respond = () => currentError.ngError; const server = http2.createServer(); server.on('stream', common.mustCall((stream, headers) => {