Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/node): windows cancel stdin read in line mode #23969

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 126 additions & 17 deletions ext/io/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ use winapi::um::processenv::GetStdHandle;
#[cfg(windows)]
use winapi::um::winbase;

#[cfg(windows)]
use std::sync::Arc;
#[cfg(windows)]
use std::sync::Condvar;
#[cfg(windows)]
use std::sync::Mutex;
littledivy marked this conversation as resolved.
Show resolved Hide resolved

pub mod fs;
mod pipe;
#[cfg(windows)]
Expand Down Expand Up @@ -106,12 +113,21 @@ deno_core::extension!(deno_io,
},
state = |state, options| {
if let Some(stdio) = options.stdio {
#[cfg(windows)]
let stdin_state = {
let st = Arc::new(Mutex::new(WinTtyState::default()));
state.put(st.clone());
st
};
#[cfg(unix)]
let stdin_state = ();

let t = &mut state.resource_table;

let rid = t.add(fs::FileResource::new(
Rc::new(match stdio.stdin.pipe {
StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdin,
StdFileResourceKind::Stdin(stdin_state),
STDIN_HANDLE.try_clone().unwrap(),
),
StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
Expand Down Expand Up @@ -317,14 +333,29 @@ impl Resource for ChildStderrResource {
}
}

#[derive(Clone, Copy)]
#[cfg(windows)]
use winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO;
littledivy marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(windows)]
#[derive(Default)]
pub struct WinTtyState {
pub cancelled: bool,
pub reading: bool,
pub screen_buffer_info: Option<CONSOLE_SCREEN_BUFFER_INFO>,
pub cvar: Arc<Condvar>,
}

#[derive(Clone)]
enum StdFileResourceKind {
File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
Stdin,
#[cfg(windows)]
Stdin(Arc<Mutex<WinTtyState>>),
#[cfg(not(windows))]
Stdin(()),
Stdout,
Stderr,
}
Expand Down Expand Up @@ -435,7 +466,7 @@ impl crate::fs::File for StdFileResourceInner {
// std/src/sys/windows/stdio.rs in Rust's source code).
match self.kind {
StdFileResourceKind::File => self.with_sync(|file| Ok(file.write(buf)?)),
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand All @@ -457,7 +488,7 @@ impl crate::fs::File for StdFileResourceInner {

fn read_sync(self: Rc<Self>, buf: &mut [u8]) -> FsResult<usize> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self.with_sync(|file| Ok(file.read(buf)?))
}
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
Expand All @@ -471,7 +502,7 @@ impl crate::fs::File for StdFileResourceInner {
StdFileResourceKind::File => {
self.with_sync(|file| Ok(file.write_all(buf)?))
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand All @@ -497,7 +528,7 @@ impl crate::fs::File for StdFileResourceInner {
.with_inner_blocking_task(move |file| Ok(file.write_all(&buf)?))
.await
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand Down Expand Up @@ -538,7 +569,7 @@ impl crate::fs::File for StdFileResourceInner {
})
.await
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand Down Expand Up @@ -568,7 +599,7 @@ impl crate::fs::File for StdFileResourceInner {

fn read_all_sync(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
let mut buf = Vec::new();
self.with_sync(|file| Ok(file.read_to_end(&mut buf)?))?;
Ok(buf)
Expand All @@ -580,7 +611,7 @@ impl crate::fs::File for StdFileResourceInner {
}
async fn read_all_async(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self
.with_inner_blocking_task(|file| {
let mut buf = Vec::new();
Expand Down Expand Up @@ -736,19 +767,97 @@ impl crate::fs::File for StdFileResourceInner {
self: Rc<Self>,
mut buf: BufMutView,
) -> FsResult<(usize, BufMutView)> {
self
.with_inner_blocking_task(|file| {
let nread = file.read(&mut buf)?;
Ok((nread, buf))
})
.await
match &self.kind {
/* On Windows, we need to handle special read cancellation logic for stdin */
#[cfg(windows)]
StdFileResourceKind::Stdin(state) => {
loop {
let state = state.clone();
littledivy marked this conversation as resolved.
Show resolved Hide resolved

let fut = self.with_inner_blocking_task(move |file| {
/* Start reading, and set the reading flag to true */
state.lock().unwrap().reading = true;
littledivy marked this conversation as resolved.
Show resolved Hide resolved
let nread = match file.read(&mut buf) {
Ok(nread) => nread,
Err(e) => return Err((e.into(), buf)),
};

let mut state = state.lock().unwrap();
state.reading = false;

/* If we canceled the read by sending a VK_RETURN event, restore
the screen state to undo the visual effect of the VK_RETURN event */
if state.cancelled {
if let Some(screen_buffer_info) = state.screen_buffer_info {
// SAFETY: WinAPI calls to open conout$ and restore visual state.
unsafe {
let handle = winapi::um::fileapi::CreateFileW(
"conout$"
.encode_utf16()
.chain(Some(0))
.collect::<Vec<_>>()
.as_ptr(),
winapi::um::winnt::GENERIC_READ
| winapi::um::winnt::GENERIC_WRITE,
winapi::um::winnt::FILE_SHARE_READ
| winapi::um::winnt::FILE_SHARE_WRITE,
std::ptr::null_mut(),
winapi::um::fileapi::OPEN_EXISTING,
0,
std::ptr::null_mut(),
);

let mut pos = screen_buffer_info.dwCursorPosition;
/* If the cursor was at the bottom line of the screen buffer, the
VK_RETURN would have caused the buffer contents to scroll up by
one line. The right position to reset the cursor to is therefore one
line higher */
if pos.Y == screen_buffer_info.dwSize.Y - 1 {
pos.Y -= 1;
}

winapi::um::wincon::SetConsoleCursorPosition(handle, pos);
winapi::um::handleapi::CloseHandle(handle);
}
}

/* Reset the cancelled flag */
state.cancelled = false;

/* Unblock the main thread */
state.cvar.notify_one();

return Err((FsError::FileBusy, buf));
}
littledivy marked this conversation as resolved.
Show resolved Hide resolved

Ok((nread, buf))
});

match fut.await {
Err((FsError::FileBusy, b)) => {
buf = b;
continue;
}
other => return other.map_err(|(e, _)| e),
}
}
}
_ => {
self
.with_inner_blocking_task(|file| {
let nread = file.read(&mut buf)?;
Ok((nread, buf))
})
.await
}
}
}

fn try_clone_inner(self: Rc<Self>) -> FsResult<Rc<dyn fs::File>> {
let inner: &Option<_> = &self.cell.borrow();
match inner {
Some(inner) => Ok(Rc::new(StdFileResourceInner {
kind: self.kind,
kind: self.kind.clone(),
cell: RefCell::new(Some(inner.try_clone()?)),
cell_async_task_queue: Default::default(),
handle: self.handle,
Expand Down
81 changes: 81 additions & 0 deletions runtime/ops/tty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use rustyline::KeyCode;
use rustyline::KeyEvent;
use rustyline::Modifiers;

#[cfg(windows)]
use deno_io::WinTtyState;
#[cfg(windows)]
use std::sync::Arc;
#[cfg(windows)]
use std::sync::Mutex;

#[cfg(unix)]
use deno_core::ResourceId;
#[cfg(unix)]
Expand Down Expand Up @@ -94,6 +101,7 @@ fn op_set_raw(
#[cfg(windows)]
{
use winapi::shared::minwindef::FALSE;

use winapi::um::consoleapi;

let handle = handle_or_fd;
Expand All @@ -116,6 +124,79 @@ fn op_set_raw(
mode_raw_input_off(original_mode)
};

let stdin_state = state.borrow::<Arc<Mutex<WinTtyState>>>();
let mut stdin_state = stdin_state.lock().unwrap();

if stdin_state.reading {
let cvar = stdin_state.cvar.clone();

/* Trick to unblock an ongoing line-buffered read operation if not already pending.
See https://github.com/libuv/libuv/pull/866 for prior art */
if original_mode & COOKED_MODE != 0 && !stdin_state.cancelled {
// SAFETY: Write enter key event to force the console wait to return.
let record = unsafe {
let mut record: wincon::INPUT_RECORD = std::mem::zeroed();
record.EventType = wincon::KEY_EVENT;
record.Event.KeyEvent_mut().wVirtualKeyCode =
winapi::um::winuser::VK_RETURN as u16;
record.Event.KeyEvent_mut().bKeyDown = 1;
record.Event.KeyEvent_mut().wRepeatCount = 1;
*record.Event.KeyEvent_mut().uChar.UnicodeChar_mut() = '\r' as u16;
record.Event.KeyEvent_mut().dwControlKeyState = 0;
record.Event.KeyEvent_mut().wVirtualScanCode =
winapi::um::winuser::MapVirtualKeyW(
winapi::um::winuser::VK_RETURN as u32,
winapi::um::winuser::MAPVK_VK_TO_VSC,
) as u16;
record
};
stdin_state.cancelled = true;

// SAFETY: winapi call to open conout$ and save screen state.
let active_screen_buffer = unsafe {
/* Save screen state before sending the VK_RETURN event */
let handle = winapi::um::fileapi::CreateFileW(
"conout$"
.encode_utf16()
.chain(Some(0))
.collect::<Vec<_>>()
.as_ptr(),
winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE,
winapi::um::winnt::FILE_SHARE_READ
| winapi::um::winnt::FILE_SHARE_WRITE,
std::ptr::null_mut(),
winapi::um::fileapi::OPEN_EXISTING,
0,
std::ptr::null_mut(),
);

let mut active_screen_buffer = std::mem::zeroed();
winapi::um::wincon::GetConsoleScreenBufferInfo(
handle,
&mut active_screen_buffer,
);
winapi::um::handleapi::CloseHandle(handle);
active_screen_buffer
};
stdin_state.screen_buffer_info = Some(active_screen_buffer);

// SAFETY: winapi call to write the VK_RETURN event.
if unsafe {
winapi::um::wincon::WriteConsoleInputW(handle, &record, 1, &mut 0)
} == FALSE
{
return Err(Error::last_os_error().into());
}

/* Wait for read thread to acknowledge the cancellation to ensure that nothing
interferes with the screen state.
NOTE: `wait_while` automatically unlocks stdin_state */
let _unused = cvar
.wait_while(stdin_state, |state| state.cancelled)
.unwrap();
}
}

// SAFETY: winapi call
if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE {
return Err(Error::last_os_error().into());
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/run_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3202,6 +3202,20 @@ itest!(byte_order_mark {
output: "run/byte_order_mark.out",
});

#[test]
#[cfg(windows)]
fn process_stdin_read_unblock() {
TestContext::default()
.new_command()
.args_vec(["run", "run/process_stdin_unblock.mjs"])
.with_pty(|mut console| {
console.write_raw("b");
console.human_delay();
console.write_line_raw("s");
console.expect_all(&["1", "1"]);
});
}

#[test]
fn issue9750() {
TestContext::default()
Expand Down
21 changes: 21 additions & 0 deletions tests/testdata/run/process_stdin_unblock.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import process from "node:process";

function prompt() {
process.stdin.setRawMode(true);

const { promise, resolve } = Promise.withResolvers();

const onData = (buf) => {
process.stdin.setRawMode(false);
process.stdin.removeListener("data", onData);
console.log(buf.length);
resolve();
};

process.stdin.on("data", onData);
return promise;
}

await prompt();
await prompt();
Deno.exit(0);