diff --git a/cli/tests/integration/mod.rs b/cli/tests/integration/mod.rs index 277b6a5d6a4db4..3101d8dc76ebd2 100644 --- a/cli/tests/integration/mod.rs +++ b/cli/tests/integration/mod.rs @@ -662,6 +662,46 @@ fn websocketstream() { assert!(status.success()); } +#[test] +fn websocketstream_ping() { + use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite; + let _g = util::http_server(); + + let script = util::testdata_path().join("websocketstream_ping_test.ts"); + let root_ca = util::testdata_path().join("tls/RootCA.pem"); + let mut child = util::deno_cmd() + .arg("test") + .arg("--unstable") + .arg("--allow-net") + .arg("--cert") + .arg(root_ca) + .arg(script) + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap(); + let (stream, _) = server.accept().unwrap(); + let mut socket = tungstenite::accept(stream).unwrap(); + socket + .write_message(tungstenite::Message::Text(String::from("A"))) + .unwrap(); + socket + .write_message(tungstenite::Message::Ping(vec![])) + .unwrap(); + socket + .write_message(tungstenite::Message::Text(String::from("B"))) + .unwrap(); + let message = socket.read_message().unwrap(); + assert_eq!(message, tungstenite::Message::Pong(vec![])); + socket + .write_message(tungstenite::Message::Text(String::from("C"))) + .unwrap(); + socket.close(None).unwrap(); + + assert!(child.wait().unwrap().success()); +} + #[test] fn websocket_server_multi_field_connection_header() { let script = util::testdata_path() diff --git a/cli/tests/testdata/websocketstream_ping_test.ts b/cli/tests/testdata/websocketstream_ping_test.ts new file mode 100644 index 00000000000000..12f847cd8e4664 --- /dev/null +++ b/cli/tests/testdata/websocketstream_ping_test.ts @@ -0,0 +1,5 @@ +const wss = new WebSocketStream("ws://127.0.0.1:4513"); +const { readable } = await wss.connection; +for await (const _ of readable) { + // +} diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 50dfac284eb81a..df87c0c9729401 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -232,6 +232,43 @@ await this.closed; }, }); + const pull = async (controller) => { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); + + switch (kind) { + case "string": { + controller.enqueue(value); + break; + } + case "binary": { + controller.enqueue(value); + break; + } + case "ping": { + await core.opAsync("op_ws_send", this[_rid], { + kind: "pong", + }); + await pull(controller); + break; + } + case "closed": + case "close": { + this[_closed].resolve(value); + core.tryClose(this[_rid]); + break; + } + case "error": { + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + core.tryClose(this[_rid]); + break; + } + } + }; const readable = new ReadableStream({ start: (controller) => { PromisePrototypeThen(this.closed, () => { @@ -250,42 +287,7 @@ } }); }, - pull: async (controller) => { - const { kind, value } = await core.opAsync( - "op_ws_next_event", - this[_rid], - ); - - switch (kind) { - case "string": { - controller.enqueue(value); - break; - } - case "binary": { - controller.enqueue(value); - break; - } - case "ping": { - await core.opAsync("op_ws_send", this[_rid], { - kind: "pong", - }); - break; - } - case "closed": - case "close": { - this[_closed].resolve(value); - core.tryClose(this[_rid]); - break; - } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); - core.tryClose(this[_rid]); - break; - } - } - }, + pull, cancel: async (reason) => { try { this.close(reason?.code !== undefined ? reason : {});