From 313b0268bf4d6ca9b400bea782b3b7b670c9827f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 30 Jul 2024 13:34:52 +0100 Subject: [PATCH] refactor(ext/node): create separate ops for node:http module (#24788) This commit duplicates ops from "ext/fetch" to "ext/node" to kick off a bigger rewrite of "node:http". Most of duplication is temporary and will be removed as these ops evolve. (cherry picked from commit c6ecf70a0963ce5cac67af97c55aa360a1d25c4b) --- Cargo.lock | 2 + ext/node/Cargo.toml | 2 + ext/node/lib.rs | 2 + ext/node/ops/http.rs | 412 ++++++++++++++++++++++++++++++++++++- ext/node/polyfills/http.ts | 8 +- 5 files changed, 420 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e872982b7b7d2d..3549738740dea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1759,6 +1759,8 @@ dependencies = [ "home", "http 1.1.0", "http-body-util", + "hyper 1.4.1", + "hyper-util", "idna 0.3.0", "indexmap", "ipnetwork", diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 8bbc13ee60238f..44cf81481dc904 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -46,6 +46,8 @@ hkdf.workspace = true home = "0.5.9" http.workspace = true http-body-util.workspace = true +hyper.workspace = true +hyper-util.workspace = true idna = "0.3.0" indexmap.workspace = true ipnetwork = "0.20.0" diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 8d6d76f09a7aad..899ffc44e21ce0 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -327,6 +327,8 @@ deno_core::extension!(deno_node, ops::zlib::brotli::op_brotli_decompress_stream, ops::zlib::brotli::op_brotli_decompress_stream_end, ops::http::op_node_http_request

, + ops::http::op_node_http_fetch_response_upgrade, + ops::http::op_node_http_fetch_send, ops::http2::op_http2_connect, ops::http2::op_http2_poll_client_connection, ops::http2::op_http2_client_request, diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 19847820e12bab..4b1f99ec045fe9 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -1,20 +1,42 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use std::borrow::Cow; +use std::cell::RefCell; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::anyhow; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::stream::Peekable; +use deno_core::futures::Future; +use deno_core::futures::FutureExt; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; use deno_core::op2; +use deno_core::serde::Serialize; +use deno_core::unsync::spawn; use deno_core::url::Url; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ResourceId; use deno_fetch::get_or_create_client_from_state; use deno_fetch::FetchCancelHandle; use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; -use deno_fetch::ResourceToBodyAdapter; +use deno_fetch::ResBody; use http::header::HeaderMap; use http::header::HeaderName; use http::header::HeaderValue; @@ -22,6 +44,11 @@ use http::header::AUTHORIZATION; use http::header::CONTENT_LENGTH; use http::Method; use http_body_util::BodyExt; +use hyper::body::Frame; +use hyper_util::rt::TokioIo; +use std::cmp::min; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; #[op2] #[serde] @@ -64,7 +91,9 @@ where let (body, con_len) = if let Some(body) = body { ( - ResourceToBodyAdapter::new(state.resource_table.take_any(body)?).boxed(), + BodyExt::boxed(NodeHttpResourceToBodyAdapter::new( + state.resource_table.take_any(body)?, + )), None, ) } else { @@ -125,3 +154,382 @@ where cancel_handle_rid: Some(cancel_handle_rid), }) } + +#[derive(Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeHttpFetchResponse { + pub status: u16, + pub status_text: String, + pub headers: Vec<(ByteString, ByteString)>, + pub url: String, + pub response_rid: ResourceId, + pub content_length: Option, + pub remote_addr_ip: Option, + pub remote_addr_port: Option, + pub error: Option, +} + +#[op2(async)] +#[serde] +pub async fn op_node_http_fetch_send( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result { + let request = state + .borrow_mut() + .resource_table + .take::(rid)?; + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_node_http_fetch_send ongoing"); + + let res = match request.future.await { + Ok(Ok(res)) => res, + Ok(Err(err)) => { + // We're going to try and rescue the error cause from a stream and return it from this fetch. + // If any error in the chain is a hyper body error, return that as a special result we can use to + // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). + // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead + let mut err_ref: &dyn std::error::Error = err.as_ref(); + while let Some(err) = std::error::Error::source(err_ref) { + if let Some(err) = err.downcast_ref::() { + if let Some(err) = std::error::Error::source(err) { + return Ok(NodeHttpFetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); + } + } + err_ref = err; + } + + return Err(type_error(err.to_string())); + } + Err(_) => return Err(type_error("request was cancelled")), + }; + + let status = res.status(); + let url = request.url.into(); + let mut res_headers = Vec::new(); + for (key, val) in res.headers().iter() { + res_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let content_length = hyper::body::Body::size_hint(res.body()).exact(); + let remote_addr = res + .extensions() + .get::() + .map(|info| info.remote_addr()); + let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { + (Some(addr.ip().to_string()), Some(addr.port())) + } else { + (None, None) + }; + + let response_rid = state + .borrow_mut() + .resource_table + .add(NodeHttpFetchResponseResource::new(res, content_length)); + + Ok(NodeHttpFetchResponse { + status: status.as_u16(), + status_text: status.canonical_reason().unwrap_or("").to_string(), + headers: res_headers, + url, + response_rid, + content_length, + remote_addr_ip, + remote_addr_port, + error: None, + }) +} + +#[op2(async)] +#[smi] +pub async fn op_node_http_fetch_response_upgrade( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result { + let raw_response = state + .borrow_mut() + .resource_table + .take::(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto NodeHttpFetchResponseResource"); + + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + let upgraded = raw_response.upgrade().await?; + { + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = + tokio::io::split(TokioIo::new(upgraded)); + + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + } + + Ok( + state + .borrow_mut() + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} + +struct UpgradeStream { + read: AsyncRefCell>, + write: AsyncRefCell>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf, + write: tokio::io::WriteHalf, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc, buf: &[u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow { + "fetchUpgradedStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } +} + +type BytesStream = + Pin> + Unpin>>; + +pub enum NodeHttpFetchResponseReader { + Start(http::Response), + BodyReader(Peekable), +} + +impl Default for NodeHttpFetchResponseReader { + fn default() -> Self { + let stream: BytesStream = Box::pin(deno_core::futures::stream::empty()); + Self::BodyReader(stream.peekable()) + } +} + +#[derive(Debug)] +pub struct NodeHttpFetchResponseResource { + pub response_reader: AsyncRefCell, + pub cancel: CancelHandle, + pub size: Option, +} + +impl NodeHttpFetchResponseResource { + pub fn new(response: http::Response, size: Option) -> Self { + Self { + response_reader: AsyncRefCell::new(NodeHttpFetchResponseReader::Start( + response, + )), + cancel: CancelHandle::default(), + size, + } + } + + pub async fn upgrade(self) -> Result { + let reader = self.response_reader.into_inner(); + match reader { + NodeHttpFetchResponseReader::Start(resp) => { + Ok(hyper::upgrade::on(resp).await?) + } + _ => unreachable!(), + } + } +} + +impl Resource for NodeHttpFetchResponseResource { + fn name(&self) -> Cow { + "fetchResponse".into() + } + + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(async move { + let mut reader = + RcRef::map(&self, |r| &r.response_reader).borrow_mut().await; + + let body = loop { + match &mut *reader { + NodeHttpFetchResponseReader::BodyReader(reader) => break reader, + NodeHttpFetchResponseReader::Start(_) => {} + } + + match std::mem::take(&mut *reader) { + NodeHttpFetchResponseReader::Start(resp) => { + let stream: BytesStream = + Box::pin(resp.into_body().into_data_stream().map(|r| { + r.map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + })); + *reader = + NodeHttpFetchResponseReader::BodyReader(stream.peekable()); + } + NodeHttpFetchResponseReader::BodyReader(_) => unreachable!(), + } + }; + let fut = async move { + let mut reader = Pin::new(body); + loop { + match reader.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let chunk = chunk.split_to(len); + break Ok(chunk.into()); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match reader.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(type_error(err.to_string())), + }, + None => break Ok(BufView::empty()), + } + } + }; + + let cancel_handle = RcRef::map(self, |r| &r.cancel); + fut.try_or_cancel(cancel_handle).await + }) + } + + fn size_hint(&self) -> (u64, Option) { + (self.size.unwrap_or(0), self.size) + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +#[allow(clippy::type_complexity)] +pub struct NodeHttpResourceToBodyAdapter( + Rc, + Option>>>>, +); + +impl NodeHttpResourceToBodyAdapter { + pub fn new(resource: Rc) -> Self { + let future = resource.clone().read(64 * 1024); + Self(resource, Some(future)) + } +} + +// SAFETY: we only use this on a single-threaded executor +unsafe impl Send for NodeHttpResourceToBodyAdapter {} +// SAFETY: we only use this on a single-threaded executor +unsafe impl Sync for NodeHttpResourceToBodyAdapter {} + +impl Stream for NodeHttpResourceToBodyAdapter { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + if let Some(mut fut) = this.1.take() { + match fut.poll_unpin(cx) { + Poll::Pending => { + this.1 = Some(fut); + Poll::Pending + } + Poll::Ready(res) => match res { + Ok(buf) if buf.is_empty() => Poll::Ready(None), + Ok(buf) => { + this.1 = Some(this.0.clone().read(64 * 1024)); + Poll::Ready(Some(Ok(buf.to_vec().into()))) + } + Err(err) => Poll::Ready(Some(Err(err))), + }, + } + } else { + Poll::Ready(None) + } + } +} + +impl hyper::body::Body for NodeHttpResourceToBodyAdapter { + type Data = Bytes; + type Error = anyhow::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.poll_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for NodeHttpResourceToBodyAdapter { + fn drop(&mut self) { + self.0.clone().close() + } +} diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 51d228d4eb42b9..f4126241e169ef 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -5,8 +5,8 @@ import { core, primordials } from "ext:core/mod.js"; import { - op_fetch_response_upgrade, - op_fetch_send, + op_node_http_fetch_response_upgrade, + op_node_http_fetch_send, op_node_http_request, } from "ext:core/ops"; @@ -628,7 +628,7 @@ class ClientRequest extends OutgoingMessage { (async () => { try { - const res = await op_fetch_send(this._req.requestRid); + const res = await op_node_http_fetch_send(this._req.requestRid); if (this._req.cancelHandleRid !== null) { core.tryClose(this._req.cancelHandleRid); } @@ -676,7 +676,7 @@ class ClientRequest extends OutgoingMessage { throw new Error("not implemented CONNECT"); } - const upgradeRid = await op_fetch_response_upgrade( + const upgradeRid = await op_node_http_fetch_response_upgrade( res.responseRid, ); assert(typeof res.remoteAddrIp !== "undefined");