diff --git a/src/node_http2.cc b/src/node_http2.cc index b5b38fc976158b..f4f8eeb17c6ebf 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1518,12 +1518,12 @@ void Http2Session::ClearOutgoing(int status) { std::vector current_outgoing_buffers_; current_outgoing_buffers_.swap(outgoing_buffers_); for (const NgHttp2StreamWrite& wr : current_outgoing_buffers_) { - WriteWrap* wrap = wr.req_wrap; - if (wrap != nullptr) { + BaseObjectPtr wrap = std::move(wr.req_wrap); + if (wrap) { // TODO(addaleax): Pass `status` instead of 0, so that we actually error // out with the error from the write to the underlying protocol, // if one occurred. - wrap->Done(0); + WriteWrap::FromObject(wrap)->Done(0); } } } @@ -1806,7 +1806,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) { for (const NgHttp2StreamWrite& wr : outgoing_buffers_) { - if (wr.req_wrap != nullptr && wr.req_wrap->stream() == stream) + if (wr.req_wrap && WriteWrap::FromObject(wr.req_wrap)->stream() == stream) return true; } return false; @@ -1959,8 +1959,8 @@ void Http2Stream::Destroy() { // we still have queued outbound writes. while (!queue_.empty()) { NgHttp2StreamWrite& head = queue_.front(); - if (head.req_wrap != nullptr) - head.req_wrap->Done(UV_ECANCELED); + if (head.req_wrap) + WriteWrap::FromObject(head.req_wrap)->Done(UV_ECANCELED); queue_.pop(); } @@ -2189,7 +2189,8 @@ int Http2Stream::DoWrite(WriteWrap* req_wrap, // Store the req_wrap on the last write info in the queue, so that it is // only marked as finished once all buffers associated with it are finished. queue_.emplace(NgHttp2StreamWrite { - i == nbufs - 1 ? req_wrap : nullptr, + BaseObjectPtr( + i == nbufs - 1 ? req_wrap->GetAsyncWrap() : nullptr), bufs[i] }); IncrementAvailableOutboundLength(bufs[i].len); @@ -2283,10 +2284,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, // find out when the HTTP2 stream wants to consume data, and because the // StreamBase API allows empty input chunks. while (!stream->queue_.empty() && stream->queue_.front().buf.len == 0) { - WriteWrap* finished = stream->queue_.front().req_wrap; + BaseObjectPtr finished = + std::move(stream->queue_.front().req_wrap); stream->queue_.pop(); - if (finished != nullptr) - finished->Done(0); + if (finished) + WriteWrap::FromObject(finished)->Done(0); } if (!stream->queue_.empty()) { @@ -2912,8 +2914,8 @@ void Http2Ping::DetachFromSession() { } void NgHttp2StreamWrite::MemoryInfo(MemoryTracker* tracker) const { - if (req_wrap != nullptr) - tracker->TrackField("req_wrap", req_wrap->GetAsyncWrap()); + if (req_wrap) + tracker->TrackField("req_wrap", req_wrap); tracker->TrackField("buf", buf); } diff --git a/src/node_http2.h b/src/node_http2.h index 4ef9e9fbab0c43..e49c3a60f32bcc 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -145,12 +145,12 @@ using Http2Headers = NgHeaders; using Http2RcBufferPointer = NgRcBufPointer; struct NgHttp2StreamWrite : public MemoryRetainer { - WriteWrap* req_wrap = nullptr; + BaseObjectPtr req_wrap; uv_buf_t buf; inline explicit NgHttp2StreamWrite(uv_buf_t buf_) : buf(buf_) {} - inline NgHttp2StreamWrite(WriteWrap* req, uv_buf_t buf_) : - req_wrap(req), buf(buf_) {} + inline NgHttp2StreamWrite(BaseObjectPtr req_wrap, uv_buf_t buf_) : + req_wrap(std::move(req_wrap)), buf(buf_) {} void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(NgHttp2StreamWrite) diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index bd7224e9c0245e..c003ffc1ef6595 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -243,6 +243,28 @@ StreamBase* StreamBase::FromObject(v8::Local obj) { StreamBase::kStreamBaseField)); } +WriteWrap* WriteWrap::FromObject(v8::Local req_wrap_obj) { + return static_cast(StreamReq::FromObject(req_wrap_obj)); +} + +template +WriteWrap* WriteWrap::FromObject( + const BaseObjectPtrImpl& base_obj) { + if (!base_obj) return nullptr; + return FromObject(base_obj->object()); +} + +ShutdownWrap* ShutdownWrap::FromObject(v8::Local req_wrap_obj) { + return static_cast(StreamReq::FromObject(req_wrap_obj)); +} + +template +ShutdownWrap* ShutdownWrap::FromObject( + const BaseObjectPtrImpl& base_obj) { + if (!base_obj) return nullptr; + return FromObject(base_obj->object()); +} + void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) { CHECK_NULL(storage_.data()); storage_ = std::move(storage); diff --git a/src/stream_base.cc b/src/stream_base.cc index b35df39afe9d8c..3ad201746007dc 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -621,12 +621,16 @@ StreamResource::~StreamResource() { ShutdownWrap* StreamBase::CreateShutdownWrap( Local object) { - return new SimpleShutdownWrap(this, object); + auto* wrap = new SimpleShutdownWrap(this, object); + wrap->MakeWeak(); + return wrap; } WriteWrap* StreamBase::CreateWriteWrap( Local object) { - return new SimpleWriteWrap(this, object); + auto* wrap = new SimpleWriteWrap(this, object); + wrap->MakeWeak(); + return wrap; } } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index 7c6bcba81edd03..72142309fe1902 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -77,6 +77,11 @@ class ShutdownWrap : public StreamReq { StreamBase* stream, v8::Local req_wrap_obj); + static inline ShutdownWrap* FromObject(v8::Local req_wrap_obj); + template + static inline ShutdownWrap* FromObject( + const BaseObjectPtrImpl& base_obj); + // Call stream()->EmitAfterShutdown() and dispose of this request wrap. void OnDone(int status) override; }; @@ -89,6 +94,11 @@ class WriteWrap : public StreamReq { StreamBase* stream, v8::Local req_wrap_obj); + static inline WriteWrap* FromObject(v8::Local req_wrap_obj); + template + static inline WriteWrap* FromObject( + const BaseObjectPtrImpl& base_obj); + // Call stream()->EmitAfterWrite() and dispose of this request wrap. void OnDone(int status) override; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 91faeafb62b660..85591902d5fabd 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -95,9 +95,10 @@ bool TLSWrap::InvokeQueued(int status, const char* error_str) { if (!write_callback_scheduled_) return false; - if (current_write_ != nullptr) { - WriteWrap* w = current_write_; - current_write_ = nullptr; + if (current_write_) { + BaseObjectPtr current_write = std::move(current_write_); + current_write_.reset(); + WriteWrap* w = WriteWrap::FromObject(current_write); w->Done(status, error_str); } @@ -301,7 +302,7 @@ void TLSWrap::EncOut() { } // Split-off queue - if (established_ && current_write_ != nullptr) { + if (established_ && current_write_) { Debug(this, "EncOut() setting write_callback_scheduled_"); write_callback_scheduled_ = true; } @@ -372,10 +373,12 @@ void TLSWrap::EncOut() { void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) { Debug(this, "OnStreamAfterWrite(status = %d)", status); - if (current_empty_write_ != nullptr) { + if (current_empty_write_) { Debug(this, "Had empty write"); - WriteWrap* finishing = current_empty_write_; - current_empty_write_ = nullptr; + BaseObjectPtr current_empty_write = + std::move(current_empty_write_); + current_empty_write_.reset(); + WriteWrap* finishing = WriteWrap::FromObject(current_empty_write); finishing->Done(status); return; } @@ -735,14 +738,14 @@ int TLSWrap::DoWrite(WriteWrap* w, ClearOut(); if (BIO_pending(enc_out_) == 0) { Debug(this, "No pending encrypted output, writing to underlying stream"); - CHECK_NULL(current_empty_write_); - current_empty_write_ = w; + CHECK(!current_empty_write_); + current_empty_write_.reset(w->GetAsyncWrap()); StreamWriteResult res = underlying_stream()->Write(bufs, count, send_handle); if (!res.async) { BaseObjectPtr strong_ref{this}; env()->SetImmediate([this, strong_ref](Environment* env) { - OnStreamAfterWrite(current_empty_write_, 0); + OnStreamAfterWrite(WriteWrap::FromObject(current_empty_write_), 0); }); } return 0; @@ -750,8 +753,8 @@ int TLSWrap::DoWrite(WriteWrap* w, } // Store the current write wrap - CHECK_NULL(current_write_); - current_write_ = w; + CHECK(!current_write_); + current_write_.reset(w->GetAsyncWrap()); // Write encrypted data to underlying stream and call Done(). if (length == 0) { @@ -804,7 +807,7 @@ int TLSWrap::DoWrite(WriteWrap* w, // If we stopped writing because of an error, it's fatal, discard the data. if (!arg.IsEmpty()) { Debug(this, "Got SSL error (%d), returning UV_EPROTO", err); - current_write_ = nullptr; + current_write_.reset(); return UV_EPROTO; } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 7b8e50de9d4689..579f53cf9a391a 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -194,9 +194,9 @@ class TLSWrap : public AsyncWrap, // Waiting for ClearIn() to pass to SSL_write(). AllocatedBuffer pending_cleartext_input_; size_t write_size_ = 0; - WriteWrap* current_write_ = nullptr; + BaseObjectPtr current_write_; bool in_dowrite_ = false; - WriteWrap* current_empty_write_ = nullptr; + BaseObjectPtr current_empty_write_; bool write_callback_scheduled_ = false; bool started_ = false; bool established_ = false;