Skip to content

Commit

Permalink
fix: WebSocketStream ping event causes pending promises (#15235)
Browse files Browse the repository at this point in the history
  • Loading branch information
crowlKats authored Jul 18, 2022
1 parent 2bebdc9 commit 2eb27c9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 36 deletions.
40 changes: 40 additions & 0 deletions cli/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,46 @@ fn websocketstream() {
assert!(status.success());
}

#[test]
fn websocketstream_ping() {
use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
let _g = util::http_server();

let script = util::testdata_path().join("websocketstream_ping_test.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem");
let mut child = util::deno_cmd()
.arg("test")
.arg("--unstable")
.arg("--allow-net")
.arg("--cert")
.arg(root_ca)
.arg(script)
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();

let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap();
let (stream, _) = server.accept().unwrap();
let mut socket = tungstenite::accept(stream).unwrap();
socket
.write_message(tungstenite::Message::Text(String::from("A")))
.unwrap();
socket
.write_message(tungstenite::Message::Ping(vec![]))
.unwrap();
socket
.write_message(tungstenite::Message::Text(String::from("B")))
.unwrap();
let message = socket.read_message().unwrap();
assert_eq!(message, tungstenite::Message::Pong(vec![]));
socket
.write_message(tungstenite::Message::Text(String::from("C")))
.unwrap();
socket.close(None).unwrap();

assert!(child.wait().unwrap().success());
}

#[test]
fn websocket_server_multi_field_connection_header() {
let script = util::testdata_path()
Expand Down
5 changes: 5 additions & 0 deletions cli/tests/testdata/websocketstream_ping_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const wss = new WebSocketStream("ws://127.0.0.1:4513");
const { readable } = await wss.connection;
for await (const _ of readable) {
//
}
74 changes: 38 additions & 36 deletions ext/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@
await this.closed;
},
});
const pull = async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
core.tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break;
}
}
};
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
Expand All @@ -250,42 +287,7 @@
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
core.tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break;
}
}
},
pull,
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
Expand Down

0 comments on commit 2eb27c9

Please sign in to comment.