From 33f1c7065b268eba13e0b65063226b4a6e0ac093 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 18:17:00 +0200 Subject: [PATCH 01/12] feat(ext/http): upgradeHttpRaw --- cli/tests/unit/serve_test.ts | 79 ++++++++++++++++++++ cli/tsc/dts/lib.deno.unstable.d.ts | 24 ++++-- ext/http/00_serve.js | 19 ++++- ext/http/01_http.js | 5 +- ext/http/http_next.rs | 116 ++++++++++++++++++++++++++++- ext/http/lib.rs | 2 +- ext/http/websocket_upgrade.rs | 17 +++-- runtime/js/90_deno_ns.js | 1 + 8 files changed, 239 insertions(+), 24 deletions(-) diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 6158f587e6eeb6..803ea07e68586d 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -803,6 +803,85 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { await server; }); +Deno.test( + { permissions: { net: true } }, + async function httpServerWebSocketRaw() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const { conn, response } = Deno.upgradeHttpRaw(request); + const buf = new Uint8Array(1024); + let read; + + // Write our fake HTTP upgrade + await conn.write( + new TextEncoder().encode( + "HTTP/1.1 101 Switching Protocols\r\nConnection: Upgraded\r\n\r\nExtra", + ), + ); + + // Upgrade data + read = await conn.read(buf); + assertEquals( + new TextDecoder().decode(buf.subarray(0, read!)), + "Upgrade data", + ); + // Read the packet to echo + read = await conn.read(buf); + // Echo + await conn.write(buf.subarray(0, read!)); + + conn.close(); + return response; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + + const conn = await Deno.connect({ port: 4501 }); + await conn.write( + new TextEncoder().encode( + "GET / HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\nUpgrade data", + ), + ); + const buf = new Uint8Array(1024); + let len; + + // Headers + let headers = ""; + for (let i = 0; i < 2; i++) { + len = await conn.read(buf); + headers += new TextDecoder().decode(buf.subarray(0, len!)); + if (headers.endsWith("Extra")) { + break; + } + } + assertMatch( + headers, + /HTTP\/1\.1 101 Switching Protocols[ ,.A-Za-z:0-9\r\n]*Extra/im, + ); + + // Data to echo + await conn.write(new TextEncoder().encode("buffer data")); + + // Echo + len = await conn.read(buf); + assertEquals( + new TextDecoder().decode(buf.subarray(0, len!)), + "buffer data", + ); + + conn.close(); + ac.abort(); + await server; + }, +); + Deno.test( { permissions: { net: true } }, async function httpServerWebSocketUpgradeTwice() { diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index cf6cedf4142b32..b132a9aaed2ea9 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1514,7 +1514,8 @@ declare namespace Deno { */ export function upgradeHttp( request: Request, - ): Promise<[Deno.Conn, Uint8Array]>; + headers?: HeadersInit, + ): HttpUpgrade; /** **UNSTABLE**: New API, yet to be vetted. * @@ -1522,18 +1523,25 @@ declare namespace Deno { * This can be used to implement protocols that build on top of HTTP (eg. * {@linkcode WebSocket}). * - * Unlike {@linkcode Deno.upgradeHttp} this function does not require that you - * respond to the request with a {@linkcode Response} object. Instead this - * function returns the underlying connection and first packet received - * immediately, and then the caller is responsible for writing the response to - * the connection. - * * This method can only be called on requests originating the * {@linkcode Deno.serve} server. * * @category HTTP Server */ - export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array]; + export function upgradeHttpRaw(request: Request): HttpUpgrade; + + /** The object that is returned from a {@linkcode Deno.upgradeHttp} or {@linkcode Deno.upgradeHttpRaw} + * request. + * + * @category Web Sockets */ + export interface HttpUpgrade { + /** The response object that represents the HTTP response to the client, + * which should be used to the {@linkcode RequestEvent} `.respondWith()` for + * the upgrade to be successful. */ + response: Response; + /** The socket interface to communicate to the client via. */ + conn: Deno.Conn; + } /** **UNSTABLE**: New API, yet to be vetted. * diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 56f250d1dbd361..067daf06be7c5f 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -32,6 +32,7 @@ import { readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; +import { TcpConn } from "ext:deno_net/01_net.js"; const { ObjectPrototypeIsPrototypeOf, SafeSet, @@ -122,10 +123,22 @@ class InnerRequest { throw "upgradeHttp is unavailable in Deno.serve at this time"; } - // upgradeHttpRaw is async - // TODO(mmastrac) + // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { - throw "upgradeHttp is unavailable in Deno.serve at this time"; + const slabId = this.#slabId; + + this.url(); + this.headerList; + this.close(); + + this.#upgraded = () => {}; + + const upgradeRid = core.ops.op_upgrade_raw(slabId); + + // TODO(mmastrac): remoteAddr + const conn = new TcpConn(upgradeRid, null, null); + + return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } // upgradeWebSocket is sync diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 95e2cee7402b5b..a1f79c0a1d3ccb 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -482,14 +482,11 @@ function upgradeHttp(req) { return req[_deferred].promise; } -async function upgradeHttpRaw(req, tcpConn) { +function upgradeHttpRaw(req, tcpConn) { const inner = toInnerRequest(req); if (inner._wantsUpgrade) { return inner._wantsUpgrade("upgradeHttpRaw", arguments); } - - const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]); - return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr); } const spaceCharCode = StringPrototypeCharCodeAt(" ", 0); diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 1c2a232e207746..ea96c194edb677 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,13 @@ use crate::response_body::CompletionHandle; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; +use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; @@ -39,6 +41,7 @@ use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; use hyper1::upgrade::OnUpgrade; +use hyper1::upgrade::Upgraded; use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; @@ -52,6 +55,10 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; use std::rc::Rc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ReadBuf; use tokio::task::spawn_local; use tokio::task::JoinHandle; @@ -228,7 +235,76 @@ fn slab_insert( } #[op] -pub fn op_upgrade_raw(_index: usize) {} +pub fn op_upgrade_raw( + state: &mut OpState, + index: u32, +) -> Result { + // Stage 1: extract the upgrade future + let upgrade = with_http_mut(index, |http| { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + http + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + })?; + + let (read, write) = tokio::io::duplex(1024); + let (mut read_rx, mut write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + + spawn_local(async move { + let mut upgrade_stream = WebSocketUpgrade::::default(); + let mut buf = [0; 1024]; + let upgraded = loop { + let read = Pin::new(&mut write_rx).read(&mut buf).await?; + match upgrade_stream.write(&buf[..read]) { + Ok(None) => continue, + Ok(Some((response, bytes))) => { + with_resp_mut(index, |resp| *resp = Some(response)); + with_promise_mut(index, |promise| promise.complete(true)); + let mut upgraded = upgrade.await?; + upgraded.write_all(&bytes).await?; + break upgraded; + } + Err(err) => return Err(err), + } + }; + + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + + Ok(()) + }); + + Ok( + state + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} #[op] pub async fn op_upgrade( @@ -825,3 +901,41 @@ pub async fn op_http_wait( Ok(u32::MAX) } + +struct UpgradeStream { + read: AsyncRefCell>, + write: AsyncRefCell>, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf, + write: tokio::io::WriteHalf, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + } + } + + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + + async fn write(self: Rc, buf: &[u8]) -> Result { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow { + "httpRawUpgradeStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); +} diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d5404d189aea26..eaafe7c69af2ce 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -972,7 +972,7 @@ struct EarlyUpgradeSocket(AsyncRefCell, CancelHandle); enum EarlyUpgradeSocketInner { PreResponse( Rc, - WebSocketUpgrade, + WebSocketUpgrade, // Readers need to block in this state, so they can wait here for the broadcast. tokio::sync::broadcast::Sender< Rc>>, diff --git a/ext/http/websocket_upgrade.rs b/ext/http/websocket_upgrade.rs index 042a467219a235..70ad78526791b0 100644 --- a/ext/http/websocket_upgrade.rs +++ b/ext/http/websocket_upgrade.rs @@ -1,12 +1,13 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::marker::PhantomData; + use bytes::Bytes; use bytes::BytesMut; use deno_core::error::AnyError; use httparse::Status; use hyper::http::HeaderName; use hyper::http::HeaderValue; -use hyper::Body; use hyper::Response; use memmem::Searcher; use memmem::TwoWaySearcher; @@ -15,14 +16,14 @@ use once_cell::sync::OnceCell; use crate::http_error; /// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request`]. -fn parse_response( +fn parse_response( header_bytes: &[u8], -) -> Result<(usize, Response), AnyError> { +) -> Result<(usize, Response), AnyError> { let mut headers = [httparse::EMPTY_HEADER; 16]; let status = httparse::parse_headers(header_bytes, &mut headers)?; match status { Status::Complete((index, parsed)) => { - let mut resp = Response::builder().status(101).body(Body::empty())?; + let mut resp = Response::builder().status(101).body(T::default())?; for header in parsed.iter() { resp.headers_mut().append( HeaderName::from_bytes(header.name.as_bytes())?, @@ -59,12 +60,13 @@ static HEADER_SEARCHER: OnceCell = OnceCell::new(); static HEADER_SEARCHER2: OnceCell = OnceCell::new(); #[derive(Default)] -pub struct WebSocketUpgrade { +pub struct WebSocketUpgrade { state: WebSocketUpgradeState, buf: BytesMut, + _t: PhantomData, } -impl WebSocketUpgrade { +impl WebSocketUpgrade { /// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js /// WebSocket libraries that are known. We don't care about the trailing status text. fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> { @@ -80,7 +82,7 @@ impl WebSocketUpgrade { pub fn write( &mut self, bytes: &[u8], - ) -> Result, Bytes)>, AnyError> { + ) -> Result, Bytes)>, AnyError> { use WebSocketUpgradeState::*; match self.state { @@ -153,6 +155,7 @@ impl WebSocketUpgrade { #[cfg(test)] mod tests { use super::*; + use hyper::Body; type ExpectedResponseAndHead = Option<(Response, &'static [u8])>; diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index bb6ba3b08d37c7..7f553701783675 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -171,6 +171,7 @@ const denoNsUnstable = { funlock: fs.funlock, funlockSync: fs.funlockSync, upgradeHttp: http.upgradeHttp, + upgradeHttpRaw: http.upgradeHttpRaw, serve: http.serve, openKv: kv.openKv, Kv: kv.Kv, From 2e45e950fdb3dabca11ccfbde44513aef4faa133 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 18:38:31 +0200 Subject: [PATCH 02/12] feat(ext/http): upgradeHttpRaw --- cli/bench/testdata/deno_upgrade_http.js | 12 ------------ cli/tests/unit/serve_test.ts | 7 ++++++- cli/tsc/dts/lib.deno.unstable.d.ts | 26 ------------------------- ext/http/00_serve.js | 21 +++++++++++++++++--- ext/http/01_http.js | 9 +-------- ext/http/http_next.rs | 11 +++++++---- runtime/js/90_deno_ns.js | 1 - 7 files changed, 32 insertions(+), 55 deletions(-) delete mode 100644 cli/bench/testdata/deno_upgrade_http.js diff --git a/cli/bench/testdata/deno_upgrade_http.js b/cli/bench/testdata/deno_upgrade_http.js deleted file mode 100644 index ca553341132052..00000000000000 --- a/cli/bench/testdata/deno_upgrade_http.js +++ /dev/null @@ -1,12 +0,0 @@ -const { serve, upgradeHttpRaw } = Deno; -const u8 = Deno[Deno.internal].core.encode( - "HTTP/1.1 101 Switching Protocols\r\n\r\n", -); - -async function handler(req) { - const [conn, _firstPacket] = upgradeHttpRaw(req); - await conn.write(u8); - await conn.close(); -} - -serve(handler, { hostname: "127.0.0.1", port: 9000 }); diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 803ea07e68586d..5d5d0428f9011d 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -17,6 +17,11 @@ import { } from "./test_util.ts"; import { consoleSize } from "../../../runtime/js/40_tty.js"; +const { + upgradeHttpRaw, + // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol +} = Deno[Deno.internal]; + function createOnErrorCb(ac: AbortController): (err: unknown) => Response { return (err) => { console.error(err); @@ -810,7 +815,7 @@ Deno.test( const listeningPromise = deferred(); const server = Deno.serve({ handler: async (request) => { - const { conn, response } = Deno.upgradeHttpRaw(request); + const { conn, response } = upgradeHttpRaw(request); const buf = new Uint8Array(1024); let read; diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index b132a9aaed2ea9..2e5548932e7a69 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1517,32 +1517,6 @@ declare namespace Deno { headers?: HeadersInit, ): HttpUpgrade; - /** **UNSTABLE**: New API, yet to be vetted. - * - * Allows "hijacking" the connection that the request is associated with. - * This can be used to implement protocols that build on top of HTTP (eg. - * {@linkcode WebSocket}). - * - * This method can only be called on requests originating the - * {@linkcode Deno.serve} server. - * - * @category HTTP Server - */ - export function upgradeHttpRaw(request: Request): HttpUpgrade; - - /** The object that is returned from a {@linkcode Deno.upgradeHttp} or {@linkcode Deno.upgradeHttpRaw} - * request. - * - * @category Web Sockets */ - export interface HttpUpgrade { - /** The response object that represents the HTTP response to the client, - * which should be used to the {@linkcode RequestEvent} `.respondWith()` for - * the upgrade to be successful. */ - response: Response; - /** The socket interface to communicate to the client via. */ - conn: Deno.Conn; - } - /** **UNSTABLE**: New API, yet to be vetted. * * Open a new {@linkcode Deno.Kv} connection to persist data. diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 067daf06be7c5f..3ae79c85d6369f 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. const core = globalThis.Deno.core; const primordials = globalThis.__bootstrap.primordials; +const internals = globalThis.__bootstrap.internals; const { BadResourcePrototype } = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; @@ -10,7 +11,7 @@ import { newInnerResponse, toInnerResponse, } from "ext:deno_fetch/23_response.js"; -import { fromInnerRequest } from "ext:deno_fetch/23_request.js"; +import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js"; import { AbortController } from "ext:deno_web/03_abort_signal.js"; import { _eventLoop, @@ -83,6 +84,14 @@ const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( "immutable", ); +function upgradeHttpRaw(req, conn) { + const inner = toInnerRequest(req); + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeHttpRaw", conn); + } + throw new TypeError("upgradeHttpRaw may only be used with Deno.serve"); +} + class InnerRequest { #slabId; #context; @@ -126,6 +135,7 @@ class InnerRequest { // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { const slabId = this.#slabId; + const underlyingConn = originalArgs[0]; this.url(); this.headerList; @@ -135,8 +145,11 @@ class InnerRequest { const upgradeRid = core.ops.op_upgrade_raw(slabId); - // TODO(mmastrac): remoteAddr - const conn = new TcpConn(upgradeRid, null, null); + const conn = new TcpConn( + upgradeRid, + underlyingConn?.remoteAddr, + underlyingConn?.localAddr, + ); return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } @@ -626,4 +639,6 @@ async function serve(arg1, arg2) { } } +internals.upgradeHttpRaw = upgradeHttpRaw; + export { serve }; diff --git a/ext/http/01_http.js b/ext/http/01_http.js index a1f79c0a1d3ccb..19e40197ed3484 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -482,13 +482,6 @@ function upgradeHttp(req) { return req[_deferred].promise; } -function upgradeHttpRaw(req, tcpConn) { - const inner = toInnerRequest(req); - if (inner._wantsUpgrade) { - return inner._wantsUpgrade("upgradeHttpRaw", arguments); - } -} - const spaceCharCode = StringPrototypeCharCodeAt(" ", 0); const tabCharCode = StringPrototypeCharCodeAt("\t", 0); const commaCharCode = StringPrototypeCharCodeAt(",", 0); @@ -563,4 +556,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) { internals.buildCaseInsensitiveCommaValueFinder = buildCaseInsensitiveCommaValueFinder; -export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket }; +export { _ws, HttpConn, serve, upgradeHttp, upgradeWebSocket }; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index ea96c194edb677..9a04e6d72de155 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -41,7 +41,7 @@ use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; use hyper1::upgrade::OnUpgrade; -use hyper1::upgrade::Upgraded; + use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; @@ -55,10 +55,10 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; use std::rc::Rc; -use tokio::io::AsyncRead; + use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::io::ReadBuf; + use tokio::task::spawn_local; use tokio::task::JoinHandle; @@ -250,11 +250,13 @@ pub fn op_upgrade_raw( })?; let (read, write) = tokio::io::duplex(1024); - let (mut read_rx, mut write_tx) = tokio::io::split(read); + let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); spawn_local(async move { let mut upgrade_stream = WebSocketUpgrade::::default(); + + // Stage 2: Extract the Upgraded connection let mut buf = [0; 1024]; let upgraded = loop { let read = Pin::new(&mut write_rx).read(&mut buf).await?; @@ -271,6 +273,7 @@ pub fn op_upgrade_raw( } }; + // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); spawn_local(async move { diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 7f553701783675..bb6ba3b08d37c7 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -171,7 +171,6 @@ const denoNsUnstable = { funlock: fs.funlock, funlockSync: fs.funlockSync, upgradeHttp: http.upgradeHttp, - upgradeHttpRaw: http.upgradeHttpRaw, serve: http.serve, openKv: kv.openKv, Kv: kv.Kv, From ec4c684e67063e0732f6cb62f25e022350ab61dd Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 18:47:29 +0200 Subject: [PATCH 03/12] Remove non-working early upgrade stream --- ext/http/lib.rs | 190 ------------------------------------------------ 1 file changed, 190 deletions(-) diff --git a/ext/http/lib.rs b/ext/http/lib.rs index eaafe7c69af2ce..cde15af88ceb7c 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -32,7 +32,6 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; -use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; @@ -67,11 +66,9 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use websocket_upgrade::WebSocketUpgrade; use crate::network_buffered_stream::NetworkBufferedStream; use crate::reader_stream::ExternallyAbortableReaderStream; @@ -97,7 +94,6 @@ deno_core::extension!( op_http_write_resource, op_http_shutdown, op_http_websocket_accept_header, - op_http_upgrade_early, op_http_upgrade_websocket, http_next::op_serve_http, http_next::op_serve_http_on, @@ -967,192 +963,6 @@ fn op_http_websocket_accept_header(key: String) -> Result { Ok(base64::encode(digest)) } -struct EarlyUpgradeSocket(AsyncRefCell, CancelHandle); - -enum EarlyUpgradeSocketInner { - PreResponse( - Rc, - WebSocketUpgrade, - // Readers need to block in this state, so they can wait here for the broadcast. - tokio::sync::broadcast::Sender< - Rc>>, - >, - ), - PostResponse( - Rc>>, - Rc>>, - ), -} - -impl EarlyUpgradeSocket { - /// Gets a reader without holding the lock. - async fn get_reader( - self: Rc, - ) -> Result< - Rc>>, - AnyError, - > { - let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; - let cancel = RcRef::map(self, |x| &x.1); - let inner = &mut *borrow; - match inner { - EarlyUpgradeSocketInner::PreResponse(_, _, tx) => { - let mut rx = tx.subscribe(); - // Ensure we're not borrowing self here - drop(borrow); - Ok( - rx.recv() - .map_err(AnyError::from) - .try_or_cancel(&cancel) - .await?, - ) - } - EarlyUpgradeSocketInner::PostResponse(rx, _) => Ok(rx.clone()), - } - } - - async fn read(self: Rc, data: &mut [u8]) -> Result { - let reader = self.clone().get_reader().await?; - let cancel = RcRef::map(self, |x| &x.1); - Ok( - reader - .borrow_mut() - .await - .read(data) - .try_or_cancel(&cancel) - .await?, - ) - } - - /// Write all the data provided, only holding the lock while we see if the connection needs to be - /// upgraded. - async fn write_all(self: Rc, buf: &[u8]) -> Result<(), AnyError> { - let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; - let cancel = RcRef::map(self, |x| &x.1); - let inner = &mut *borrow; - match inner { - EarlyUpgradeSocketInner::PreResponse(stream, upgrade, rx_tx) => { - if let Some((resp, extra)) = upgrade.write(buf)? { - let new_wr = HttpResponseWriter::Closed; - let mut old_wr = - RcRef::map(stream.clone(), |r| &r.wr).borrow_mut().await; - let response_tx = match replace(&mut *old_wr, new_wr) { - HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), - }; - - if response_tx.send(resp).is_err() { - stream.conn.closed().await?; - return Err(http_error("connection closed while sending response")); - }; - - let mut old_rd = - RcRef::map(stream.clone(), |r| &r.rd).borrow_mut().await; - let new_rd = HttpRequestReader::Closed; - let upgraded = match replace(&mut *old_rd, new_rd) { - HttpRequestReader::Headers(request) => { - hyper::upgrade::on(request) - .map_err(AnyError::from) - .try_or_cancel(&cancel) - .await? - } - _ => { - return Err(http_error("response already started")); - } - }; - - let (rx, tx) = tokio::io::split(upgraded); - let rx = Rc::new(AsyncRefCell::new(rx)); - let tx = Rc::new(AsyncRefCell::new(tx)); - - // Take the tx and rx lock before we allow anything else to happen because we want to control - // the order of reads and writes. - let mut tx_lock = tx.clone().borrow_mut().await; - let rx_lock = rx.clone().borrow_mut().await; - - // Allow all the pending readers to go now. We still have the lock on inner, so no more - // pending readers can show up. We intentionally ignore errors here, as there may be - // nobody waiting on a read. - _ = rx_tx.send(rx.clone()); - - // We swap out inner here, so once the lock is gone, readers will acquire rx directly. - // We also fully release our lock. - *inner = EarlyUpgradeSocketInner::PostResponse(rx, tx); - drop(borrow); - - // We've updated inner and unlocked it, reads are free to go in-order. - drop(rx_lock); - - // If we had extra data after the response, write that to the upgraded connection - if !extra.is_empty() { - tx_lock.write_all(&extra).try_or_cancel(&cancel).await?; - } - } - } - EarlyUpgradeSocketInner::PostResponse(_, tx) => { - let tx = tx.clone(); - drop(borrow); - tx.borrow_mut() - .await - .write_all(buf) - .try_or_cancel(&cancel) - .await?; - } - }; - Ok(()) - } -} - -impl Resource for EarlyUpgradeSocket { - fn name(&self) -> Cow { - "upgradedHttpConnection".into() - } - - deno_core::impl_readable_byob!(); - - fn write( - self: Rc, - buf: BufView, - ) -> AsyncResult { - Box::pin(async move { - let nwritten = buf.len(); - Self::write_all(self, &buf).await?; - Ok(WriteOutcome::Full { nwritten }) - }) - } - - fn write_all(self: Rc, buf: BufView) -> AsyncResult<()> { - Box::pin(async move { Self::write_all(self, &buf).await }) - } - - fn close(self: Rc) { - self.1.cancel() - } -} - -#[op] -async fn op_http_upgrade_early( - state: Rc>, - rid: ResourceId, -) -> Result { - let stream = state - .borrow_mut() - .resource_table - .get::(rid)?; - let resources = &mut state.borrow_mut().resource_table; - let (tx, _rx) = tokio::sync::broadcast::channel(1); - let socket = EarlyUpgradeSocketInner::PreResponse( - stream, - WebSocketUpgrade::default(), - tx, - ); - let rid = resources.add(EarlyUpgradeSocket( - AsyncRefCell::new(socket), - CancelHandle::new(), - )); - Ok(rid) -} - #[op] async fn op_http_upgrade_websocket( state: Rc>, From 1ee7d15657e098da99733195020c8a2080218dbc Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 19:09:40 +0200 Subject: [PATCH 04/12] Cancel connection, move node code to internal upgradeHttpRaw method --- ext/http/00_serve.js | 2 +- ext/http/http_next.rs | 28 ++++++++++++++++++++++------ ext/node/polyfills/http.ts | 3 ++- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 3ae79c85d6369f..73e444e1913725 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -641,4 +641,4 @@ async function serve(arg1, arg2) { internals.upgradeHttpRaw = upgradeHttpRaw; -export { serve }; +export { serve, upgradeHttpRaw }; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 9a04e6d72de155..593a9c8166949a 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -908,6 +908,7 @@ pub async fn op_http_wait( struct UpgradeStream { read: AsyncRefCell>, write: AsyncRefCell>, + cancel_handle: CancelHandle, } impl UpgradeStream { @@ -918,19 +919,30 @@ impl UpgradeStream { Self { read: AsyncRefCell::new(read), write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), } } async fn read(self: Rc, buf: &mut [u8]) -> Result { - let read = RcRef::map(self, |this| &this.read); - let mut read = read.borrow_mut().await; - Ok(Pin::new(&mut *read).read(buf).await?) + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await } async fn write(self: Rc, buf: &[u8]) -> Result { - let write = RcRef::map(self, |this| &this.write); - let mut write = write.borrow_mut().await; - Ok(Pin::new(&mut *write).write(buf).await?) + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await } } @@ -941,4 +953,8 @@ impl Resource for UpgradeStream { deno_core::impl_readable_byob!(); deno_core::impl_writable!(); + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } } diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index d8ec7650bc534b..2510f7cb4401f8 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -16,6 +16,7 @@ import { Agent } from "ext:deno_node/_http_agent.mjs"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; +import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; import * as denoHttp from "ext:deno_http/01_http.js"; import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; @@ -704,7 +705,7 @@ class ServerImpl extends EventEmitter { } const req = new IncomingMessageForServer(reqEvent.request, tcpConn); if (req.upgrade && this.listenerCount("upgrade") > 0) { - const conn = await denoHttp.upgradeHttpRaw( + const conn = await upgradeHttpRaw( reqEvent.request, tcpConn, ) as Deno.Conn; From de78429338c3cd2ef7dbc9a9ae6a250f0a29b9c0 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 19:11:01 +0200 Subject: [PATCH 05/12] Restore return value of upgradeHttp --- cli/tsc/dts/lib.deno.unstable.d.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 2e5548932e7a69..0e497789cb81c4 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1514,8 +1514,7 @@ declare namespace Deno { */ export function upgradeHttp( request: Request, - headers?: HeadersInit, - ): HttpUpgrade; + ): Promise<[Deno.Conn, Uint8Array]>; /** **UNSTABLE**: New API, yet to be vetted. * From 289020780e51f1865409cedd8814b5e31645a6c2 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 19:11:17 +0200 Subject: [PATCH 06/12] [ci] From 036e446e2fb2551ff0a66e63ee670368bc8382b3 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 19:12:14 +0200 Subject: [PATCH 07/12] [ci] lint --- ext/http/01_http.js | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 19e40197ed3484..0048eedebb546a 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -64,7 +64,6 @@ const { } = primordials; const connErrorSymbol = Symbol("connError"); -const streamRid = Symbol("streamRid"); const _deferred = Symbol("upgradeHttpDeferred"); class HttpConn { From 07d352bc4c843bbfd35b53ce41e5cfbf08dccdef Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 26 Apr 2023 19:12:38 +0200 Subject: [PATCH 08/12] [ci] lint --- ext/node/polyfills/http.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 2510f7cb4401f8..785bbaab3a9935 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -17,7 +17,6 @@ import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts" import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; -import * as denoHttp from "ext:deno_http/01_http.js"; import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; From 5b198d4de7ccca42b31754233f1f07e91e28e5ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 26 Apr 2023 22:56:11 +0200 Subject: [PATCH 09/12] Revert "refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (#18552)" This reverts commit 3cd7abf73fa104526508984daef54bbb8e120310. --- ext/node/polyfills/http.ts | 177 ++++++++++++++++++++++++++++++------- 1 file changed, 145 insertions(+), 32 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 785bbaab3a9935..11e7e817d8815b 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -1,6 +1,9 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +const core = globalThis.Deno.core; +const ops = core.ops; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; +import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; import { Buffer } from "ext:deno_node/buffer.ts"; import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts"; @@ -19,6 +22,7 @@ import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; +import * as flash from "ext:deno_flash/01_http.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -190,6 +194,9 @@ const METHODS = [ type Chunk = string | Buffer | Uint8Array; +const DenoServe = flash.createServe(ops.op_node_unstable_flash_serve); +const DenoUpgradeHttpRaw = flash.upgradeHttpRaw; + const ENCODER = new TextEncoder(); export interface RequestOptions { @@ -427,7 +434,11 @@ export class ServerResponse extends NodeWritable { finished = false; headersSent = false; #firstChunk: Chunk | null = null; + // Used if --unstable flag IS NOT present #reqEvent?: Deno.RequestEvent; + // Used if --unstable flag IS present + #resolve?: (value: Response | PromiseLike) => void; + #isFlashRequest: boolean; static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { if (typeof chunk === "string") { @@ -443,7 +454,10 @@ export class ServerResponse extends NodeWritable { return status === 101 || status === 204 || status === 205 || status === 304; } - constructor(reqEvent: undefined | Deno.RequestEvent) { + constructor( + reqEvent: undefined | Deno.RequestEvent, + resolve: undefined | ((value: Response | PromiseLike) => void), + ) { let controller: ReadableByteStreamController; const readable = new ReadableStream({ start(c) { @@ -485,7 +499,9 @@ export class ServerResponse extends NodeWritable { }, }); this.#readable = readable; + this.#resolve = resolve; this.#reqEvent = reqEvent; + this.#isFlashRequest = typeof resolve !== "undefined"; } setHeader(name: string, value: string) { @@ -521,8 +537,9 @@ export class ServerResponse extends NodeWritable { this.statusCode = 200; this.statusMessage = "OK"; } + // Only taken if --unstable IS NOT present if ( - typeof singleChunk === "string" && + !this.#isFlashRequest && typeof singleChunk === "string" && !this.hasHeader("content-type") ) { this.setHeader("content-type", "text/plain;charset=UTF-8"); @@ -536,22 +553,35 @@ export class ServerResponse extends NodeWritable { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { body = null; } - this.#reqEvent!.respondWith( - new Response(body, { - headers: this.#headers, - status: this.statusCode, - statusText: this.statusMessage, - }), - ).catch(() => { - // TODO(bartlomieju): this error should be handled somehow - // ignore this error - }); + if (this.#isFlashRequest) { + this.#resolve!( + new Response(body, { + headers: this.#headers, + status: this.statusCode, + statusText: this.statusMessage, + }), + ); + } else { + this.#reqEvent!.respondWith( + new Response(body, { + headers: this.#headers, + status: this.statusCode, + statusText: this.statusMessage, + }), + ).catch(() => { + // ignore this error + }); + } } // deno-lint-ignore no-explicit-any override end(chunk?: any, encoding?: any, cb?: any): this { this.finished = true; - if (!chunk && this.#headers.has("transfer-encoding")) { + if (this.#isFlashRequest) { + // Flash sets both of these headers. + this.#headers.delete("transfer-encoding"); + this.#headers.delete("content-length"); + } else if (!chunk && this.#headers.has("transfer-encoding")) { // FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e., // the trailing "0\r\n", but respondWith() just hangs when I try that. this.#headers.set("content-length", "0"); @@ -646,12 +676,25 @@ export function Server(handler?: ServerHandler): ServerImpl { } class ServerImpl extends EventEmitter { + #isFlashServer: boolean; + #httpConnections: Set = new Set(); #listener?: Deno.Listener; + + #addr?: Deno.NetAddr; + #hasClosed = false; + #ac?: AbortController; + #servePromise?: Deferred; listening = false; constructor(handler?: ServerHandler) { super(); + // @ts-ignore Might be undefined without `--unstable` flag + this.#isFlashServer = typeof DenoServe == "function"; + if (this.#isFlashServer) { + this.#servePromise = deferred(); + this.#servePromise.then(() => this.emit("close")); + } if (handler !== undefined) { this.on("request", handler); } @@ -676,16 +719,26 @@ class ServerImpl extends EventEmitter { // TODO(bnoordhuis) Node prefers [::] when host is omitted, // we on the other hand default to 0.0.0.0. - this.listening = true; - const hostname = options.host ?? ""; - this.#listener = Deno.listen({ port, hostname }); - nextTick(() => this.#listenLoop()); + if (this.#isFlashServer) { + const hostname = options.host ?? "0.0.0.0"; + this.#addr = { + hostname, + port, + } as Deno.NetAddr; + this.listening = true; + nextTick(() => this.#serve()); + } else { + this.listening = true; + const hostname = options.host ?? ""; + this.#listener = Deno.listen({ port, hostname }); + nextTick(() => this.#listenLoop()); + } return this; } async #listenLoop() { - const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => { + const go = async (httpConn: Deno.HttpConn) => { try { for (;;) { let reqEvent = null; @@ -702,6 +755,7 @@ class ServerImpl extends EventEmitter { if (reqEvent === null) { break; } + // HEAD const req = new IncomingMessageForServer(reqEvent.request, tcpConn); if (req.upgrade && this.listenerCount("upgrade") > 0) { const conn = await upgradeHttpRaw( @@ -717,6 +771,11 @@ class ServerImpl extends EventEmitter { const res = new ServerResponse(reqEvent); this.emit("request", req, res); } + // + const req = new IncomingMessageForServer(reqEvent.request); + const res = new ServerResponse(reqEvent, undefined); + this.emit("request", req, res); + //parent of 3cd7abf73 (refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (#18552)) } } finally { this.#httpConnections.delete(httpConn); @@ -731,17 +790,56 @@ class ServerImpl extends EventEmitter { for await (const conn of listener) { let httpConn: Deno.HttpConn; try { - httpConn = httpRuntime.serveHttp(conn); + httpConn = Deno.serveHttp(conn); } catch { continue; /// Connection closed. } this.#httpConnections.add(httpConn); - go(conn, httpConn); + go(httpConn); } } } + #serve() { + const ac = new AbortController(); + const handler = (request: Request) => { + const req = new IncomingMessageForServer(request); + if (req.upgrade && this.listenerCount("upgrade") > 0) { + const [conn, head] = DenoUpgradeHttpRaw(request) as [ + Deno.Conn, + Uint8Array, + ]; + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + this.emit("upgrade", req, socket, Buffer.from(head)); + } else { + return new Promise((resolve): void => { + const res = new ServerResponse(undefined, resolve); + this.emit("request", req, res); + }); + } + }; + + if (this.#hasClosed) { + return; + } + this.#ac = ac; + DenoServe( + { + handler: handler as Deno.ServeHandler, + ...this.#addr, + signal: ac.signal, + // @ts-ignore Might be any without `--unstable` flag + onListen: ({ port }) => { + this.#addr!.port = port; + this.emit("listening"); + }, + }, + ).then(() => this.#servePromise!.resolve()); + } + setTimeout() { console.error("Not implemented: Server.setTimeout()"); } @@ -750,6 +848,7 @@ class ServerImpl extends EventEmitter { const listening = this.listening; this.listening = false; + this.#hasClosed = true; if (typeof cb === "function") { if (listening) { this.once("close", cb); @@ -760,28 +859,42 @@ class ServerImpl extends EventEmitter { } } - nextTick(() => this.emit("close")); + if (this.#isFlashServer) { + if (listening && this.#ac) { + this.#ac.abort(); + this.#ac = undefined; + } else { + this.#servePromise!.resolve(); + } + } else { + nextTick(() => this.emit("close")); - if (listening) { - this.#listener!.close(); - this.#listener = undefined; + if (listening) { + this.#listener!.close(); + this.#listener = undefined; - for (const httpConn of this.#httpConnections) { - try { - httpConn.close(); - } catch { - // Already closed. + for (const httpConn of this.#httpConnections) { + try { + httpConn.close(); + } catch { + // Already closed. + } } - } - this.#httpConnections.clear(); + this.#httpConnections.clear(); + } } return this; } address() { - const addr = this.#listener!.addr as Deno.NetAddr; + let addr; + if (this.#isFlashServer) { + addr = this.#addr!; + } else { + addr = this.#listener!.addr as Deno.NetAddr; + } return { port: addr.port, address: addr.hostname, From 8b7ba39f04e2a856fccc1baa422c87d2a7f0de8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 26 Apr 2023 23:42:11 +0200 Subject: [PATCH 10/12] migrate back to Deno.serve API --- ext/node/polyfills/http.ts | 213 +++++++------------------------------ 1 file changed, 39 insertions(+), 174 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 11e7e817d8815b..749503228e2b59 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -19,10 +19,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; -import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; -import * as flash from "ext:deno_flash/01_http.js"; +import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -194,9 +192,6 @@ const METHODS = [ type Chunk = string | Buffer | Uint8Array; -const DenoServe = flash.createServe(ops.op_node_unstable_flash_serve); -const DenoUpgradeHttpRaw = flash.upgradeHttpRaw; - const ENCODER = new TextEncoder(); export interface RequestOptions { @@ -434,11 +429,7 @@ export class ServerResponse extends NodeWritable { finished = false; headersSent = false; #firstChunk: Chunk | null = null; - // Used if --unstable flag IS NOT present - #reqEvent?: Deno.RequestEvent; - // Used if --unstable flag IS present - #resolve?: (value: Response | PromiseLike) => void; - #isFlashRequest: boolean; + #resolve: (value: Response | PromiseLike) => void; static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { if (typeof chunk === "string") { @@ -454,10 +445,7 @@ export class ServerResponse extends NodeWritable { return status === 101 || status === 204 || status === 205 || status === 304; } - constructor( - reqEvent: undefined | Deno.RequestEvent, - resolve: undefined | ((value: Response | PromiseLike) => void), - ) { + constructor(resolve: (value: Response | PromiseLike) => void) { let controller: ReadableByteStreamController; const readable = new ReadableStream({ start(c) { @@ -500,8 +488,6 @@ export class ServerResponse extends NodeWritable { }); this.#readable = readable; this.#resolve = resolve; - this.#reqEvent = reqEvent; - this.#isFlashRequest = typeof resolve !== "undefined"; } setHeader(name: string, value: string) { @@ -537,9 +523,8 @@ export class ServerResponse extends NodeWritable { this.statusCode = 200; this.statusMessage = "OK"; } - // Only taken if --unstable IS NOT present if ( - !this.#isFlashRequest && typeof singleChunk === "string" && + typeof singleChunk === "string" && !this.hasHeader("content-type") ) { this.setHeader("content-type", "text/plain;charset=UTF-8"); @@ -553,35 +538,19 @@ export class ServerResponse extends NodeWritable { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { body = null; } - if (this.#isFlashRequest) { - this.#resolve!( - new Response(body, { - headers: this.#headers, - status: this.statusCode, - statusText: this.statusMessage, - }), - ); - } else { - this.#reqEvent!.respondWith( - new Response(body, { - headers: this.#headers, - status: this.statusCode, - statusText: this.statusMessage, - }), - ).catch(() => { - // ignore this error - }); - } + this.#resolve( + new Response(body, { + headers: this.#headers, + status: this.statusCode, + statusText: this.statusMessage, + }), + ); } // deno-lint-ignore no-explicit-any override end(chunk?: any, encoding?: any, cb?: any): this { this.finished = true; - if (this.#isFlashRequest) { - // Flash sets both of these headers. - this.#headers.delete("transfer-encoding"); - this.#headers.delete("content-length"); - } else if (!chunk && this.#headers.has("transfer-encoding")) { + if (!chunk && this.#headers.has("transfer-encoding")) { // FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e., // the trailing "0\r\n", but respondWith() just hangs when I try that. this.#headers.set("content-length", "0"); @@ -607,7 +576,7 @@ export class IncomingMessageForServer extends NodeReadable { // These properties are used by `npm:forwarded` for example. socket: { remoteAddress: string; remotePort: number }; - constructor(req: Request, conn: Deno.Conn) { + constructor(req: Request, remoteAddr: { hostname: string; port: number }) { // Check if no body (GET/HEAD/OPTIONS/...) const reader = req.body?.getReader(); super({ @@ -635,8 +604,8 @@ export class IncomingMessageForServer extends NodeReadable { this.url = req.url?.slice(req.url.indexOf("/", 8)); this.method = req.method; this.socket = { - remoteAddress: conn.remoteAddr.hostname, - remotePort: conn.remoteAddr.port, + remoteAddress: remoteAddr.hostname, + remotePort: remoteAddr.port, }; this.#req = req; } @@ -676,25 +645,19 @@ export function Server(handler?: ServerHandler): ServerImpl { } class ServerImpl extends EventEmitter { - #isFlashServer: boolean; - #httpConnections: Set = new Set(); #listener?: Deno.Listener; - #addr?: Deno.NetAddr; + #addr: Deno.NetAddr; #hasClosed = false; #ac?: AbortController; - #servePromise?: Deferred; + #servePromise: Deferred; listening = false; constructor(handler?: ServerHandler) { super(); - // @ts-ignore Might be undefined without `--unstable` flag - this.#isFlashServer = typeof DenoServe == "function"; - if (this.#isFlashServer) { - this.#servePromise = deferred(); - this.#servePromise.then(() => this.emit("close")); - } + this.#servePromise = deferred(); + this.#servePromise.then(() => this.emit("close")); if (handler !== undefined) { this.on("request", handler); } @@ -719,104 +682,31 @@ class ServerImpl extends EventEmitter { // TODO(bnoordhuis) Node prefers [::] when host is omitted, // we on the other hand default to 0.0.0.0. - if (this.#isFlashServer) { - const hostname = options.host ?? "0.0.0.0"; - this.#addr = { - hostname, - port, - } as Deno.NetAddr; - this.listening = true; - nextTick(() => this.#serve()); - } else { - this.listening = true; - const hostname = options.host ?? ""; - this.#listener = Deno.listen({ port, hostname }); - nextTick(() => this.#listenLoop()); - } + const hostname = options.host ?? "0.0.0.0"; + this.#addr = { + hostname, + port, + } as Deno.NetAddr; + this.listening = true; + nextTick(() => this.#serve()); return this; } - async #listenLoop() { - const go = async (httpConn: Deno.HttpConn) => { - try { - for (;;) { - let reqEvent = null; - try { - // Note: httpConn.nextRequest() calls httpConn.close() on error. - reqEvent = await httpConn.nextRequest(); - } catch { - // Connection closed. - // TODO(bnoordhuis) Emit "clientError" event on the http.Server - // instance? Node emits it when request parsing fails and expects - // the listener to send a raw 4xx HTTP response on the underlying - // net.Socket but we don't have one to pass to the listener. - } - if (reqEvent === null) { - break; - } - // HEAD - const req = new IncomingMessageForServer(reqEvent.request, tcpConn); - if (req.upgrade && this.listenerCount("upgrade") > 0) { - const conn = await upgradeHttpRaw( - reqEvent.request, - tcpConn, - ) as Deno.Conn; - const socket = new Socket({ - handle: new TCP(constants.SERVER, conn), - }); - this.emit("upgrade", req, socket, Buffer.from([])); - return; - } else { - const res = new ServerResponse(reqEvent); - this.emit("request", req, res); - } - // - const req = new IncomingMessageForServer(reqEvent.request); - const res = new ServerResponse(reqEvent, undefined); - this.emit("request", req, res); - //parent of 3cd7abf73 (refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (#18552)) - } - } finally { - this.#httpConnections.delete(httpConn); - } - }; - - const listener = this.#listener; - - if (listener !== undefined) { - this.emit("listening"); - - for await (const conn of listener) { - let httpConn: Deno.HttpConn; - try { - httpConn = Deno.serveHttp(conn); - } catch { - continue; /// Connection closed. - } - - this.#httpConnections.add(httpConn); - go(httpConn); - } - } - } - #serve() { const ac = new AbortController(); - const handler = (request: Request) => { - const req = new IncomingMessageForServer(request); + const handler = (request: Request, info: Deno.ServeHandlerInfo) => { + const req = new IncomingMessageForServer(request, info.remoteAddr); if (req.upgrade && this.listenerCount("upgrade") > 0) { - const [conn, head] = DenoUpgradeHttpRaw(request) as [ - Deno.Conn, - Uint8Array, - ]; + const { conn, response } = upgradeHttpRaw(request); const socket = new Socket({ handle: new TCP(constants.SERVER, conn), }); - this.emit("upgrade", req, socket, Buffer.from(head)); + this.emit("upgrade", req, socket, Buffer.from([])); + return response; } else { return new Promise((resolve): void => { - const res = new ServerResponse(undefined, resolve); + const res = new ServerResponse(resolve); this.emit("request", req, res); }); } @@ -826,7 +716,7 @@ class ServerImpl extends EventEmitter { return; } this.#ac = ac; - DenoServe( + serve( { handler: handler as Deno.ServeHandler, ...this.#addr, @@ -859,45 +749,20 @@ class ServerImpl extends EventEmitter { } } - if (this.#isFlashServer) { - if (listening && this.#ac) { - this.#ac.abort(); - this.#ac = undefined; - } else { - this.#servePromise!.resolve(); - } + if (listening && this.#ac) { + this.#ac.abort(); + this.#ac = undefined; } else { - nextTick(() => this.emit("close")); - - if (listening) { - this.#listener!.close(); - this.#listener = undefined; - - for (const httpConn of this.#httpConnections) { - try { - httpConn.close(); - } catch { - // Already closed. - } - } - - this.#httpConnections.clear(); - } + this.#servePromise!.resolve(); } return this; } address() { - let addr; - if (this.#isFlashServer) { - addr = this.#addr!; - } else { - addr = this.#listener!.addr as Deno.NetAddr; - } return { - port: addr.port, - address: addr.hostname, + port: this.#addr.port, + address: this.#addr.hostname, }; } } From 24b7b85a35d8c065540bd88adc291b9df30cf047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 27 Apr 2023 01:16:13 +0200 Subject: [PATCH 11/12] lint --- ext/node/polyfills/http.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 749503228e2b59..47b312c2ee187d 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -1,7 +1,6 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. const core = globalThis.Deno.core; -const ops = core.ops; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; From dfad6aa986e463f52efbe3e0f0d04c97b0591c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 27 Apr 2023 01:27:46 +0200 Subject: [PATCH 12/12] lint again :) --- ext/node/polyfills/http.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 47b312c2ee187d..1a585f74ce1198 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -1,6 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -const core = globalThis.Deno.core; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts";