From 5d8860f3dd40dff0ae1cb4831750eb2b5094b645 Mon Sep 17 00:00:00 2001 From: Mark Thomas Date: Sun, 28 Apr 2019 17:31:35 +0100 Subject: [PATCH] add terminal wakers Terminal wakers allow other threads to wake the main terminal processing thread. --- termwiz/Cargo.toml | 3 + termwiz/examples/hello.rs | 4 +- termwiz/examples/key_tester.rs | 4 +- termwiz/examples/widgets_basic.rs | 3 +- termwiz/src/input.rs | 2 + termwiz/src/render/terminfo.rs | 9 ++- termwiz/src/terminal/mod.rs | 23 ++++--- termwiz/src/terminal/unix.rs | 100 +++++++++++++++++++----------- termwiz/src/terminal/windows.rs | 99 ++++++++++++++++++++++++++--- termwiz/src/widgets/mod.rs | 6 +- 10 files changed, 190 insertions(+), 63 deletions(-) diff --git a/termwiz/Cargo.toml b/termwiz/Cargo.toml index 63bdf6c29a5..9b71cd5eee1 100644 --- a/termwiz/Cargo.toml +++ b/termwiz/Cargo.toml @@ -41,9 +41,12 @@ signal-hook = "~0.1" termios = "~0.3" [target."cfg(windows)".dependencies.winapi] features = [ + "winbase", + "winerror", "winuser", "consoleapi", "handleapi", "fileapi", + "synchapi", ] version = "~0.3" diff --git a/termwiz/examples/hello.rs b/termwiz/examples/hello.rs index e800f195173..3d6997258ba 100644 --- a/termwiz/examples/hello.rs +++ b/termwiz/examples/hello.rs @@ -8,7 +8,7 @@ use termwiz::color::AnsiColor; use termwiz::input::{InputEvent, KeyCode, KeyEvent}; use termwiz::surface::{Change, Position, Surface}; use termwiz::terminal::buffered::BufferedTerminal; -use termwiz::terminal::{new_terminal, Blocking, Terminal}; +use termwiz::terminal::{new_terminal, Terminal}; fn main() -> Result<(), Error> { let caps = Capabilities::new_from_env()?; @@ -38,7 +38,7 @@ fn main() -> Result<(), Error> { buf.terminal().set_raw_mode()?; loop { - match buf.terminal().poll_input(Blocking::Wait) { + match buf.terminal().poll_input(None) { Ok(Some(input)) => match input { InputEvent::Key(KeyEvent { key: KeyCode::Escape, diff --git a/termwiz/examples/key_tester.rs b/termwiz/examples/key_tester.rs index 3aeea06fd85..50e0f8d317a 100644 --- a/termwiz/examples/key_tester.rs +++ b/termwiz/examples/key_tester.rs @@ -4,7 +4,7 @@ extern crate termwiz; use failure::Error; use termwiz::caps::Capabilities; use termwiz::input::{InputEvent, KeyCode, KeyEvent, Modifiers}; -use termwiz::terminal::{new_terminal, Blocking, Terminal}; +use termwiz::terminal::{new_terminal, Terminal}; const CTRL_C: KeyEvent = KeyEvent { key: KeyCode::Char('C'), @@ -16,7 +16,7 @@ fn main() -> Result<(), Error> { let mut terminal = new_terminal(caps)?; terminal.set_raw_mode()?; - while let Some(event) = terminal.poll_input(Blocking::Wait)? { + while let Some(event) = terminal.poll_input(None)? { print!("{:?}\r\n", event); if event == InputEvent::Key(CTRL_C) { break; diff --git a/termwiz/examples/widgets_basic.rs b/termwiz/examples/widgets_basic.rs index 9efcb5666b1..fcce3f32f99 100644 --- a/termwiz/examples/widgets_basic.rs +++ b/termwiz/examples/widgets_basic.rs @@ -10,7 +10,6 @@ use termwiz::color::{AnsiColor, ColorAttribute, RgbColor}; use termwiz::input::*; use termwiz::surface::Change; use termwiz::terminal::buffered::BufferedTerminal; -use termwiz::terminal::Blocking; use termwiz::terminal::{new_terminal, Terminal}; use termwiz::widgets::*; @@ -115,7 +114,7 @@ fn main() -> Result<(), Error> { buf.flush()?; // Wait for user input - match buf.terminal().poll_input(Blocking::Wait) { + match buf.terminal().poll_input(None) { Ok(Some(InputEvent::Resized { rows, cols })) => { // FIXME: this is working around a bug where we don't realize // that we should redraw everything on resize in BufferedTerminal. diff --git a/termwiz/src/input.rs b/termwiz/src/input.rs index 3dcf503cd57..1e05e946a56 100644 --- a/termwiz/src/input.rs +++ b/termwiz/src/input.rs @@ -49,6 +49,8 @@ pub enum InputEvent { /// For terminals that support Bracketed Paste mode, /// pastes are collected and reported as this variant. Paste(String), + /// The program has woken the input thread. + Wake, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/termwiz/src/render/terminfo.rs b/termwiz/src/render/terminfo.rs index 0343b287434..806a9fbe227 100644 --- a/termwiz/src/render/terminfo.rs +++ b/termwiz/src/render/terminfo.rs @@ -689,11 +689,12 @@ mod test { use crate::input::InputEvent; use crate::terminal::unix::{Purge, SetAttributeWhen, UnixTty}; use crate::terminal::ScreenSize; - use crate::terminal::{cast, Blocking, Terminal}; + use crate::terminal::{cast, Terminal, TerminalWaker}; use failure::Error; use libc::winsize; use std::io::{Error as IoError, ErrorKind, Read, Result as IoResult, Write}; use std::mem; + use std::time::Duration; use terminfo; use termios::Termios; @@ -856,9 +857,13 @@ mod test { Ok(()) } - fn poll_input(&mut self, _blocking: Blocking) -> Result, Error> { + fn poll_input(&mut self, _wait: Option) -> Result, Error> { bail!("not implemented"); } + + fn waker(&self) -> TerminalWaker { + unimplemented!(); + } } #[test] diff --git a/termwiz/src/terminal/mod.rs b/termwiz/src/terminal/mod.rs index cd1dd487c77..325de633ef2 100644 --- a/termwiz/src/terminal/mod.rs +++ b/termwiz/src/terminal/mod.rs @@ -6,6 +6,7 @@ use crate::surface::Change; use failure::Error; use num::{self, NumCast}; use std::fmt::Display; +use std::time::Duration; #[cfg(unix)] pub mod unix; @@ -15,9 +16,9 @@ pub mod windows; pub mod buffered; #[cfg(unix)] -pub use self::unix::UnixTerminal; +pub use self::unix::{UnixTerminal, UnixTerminalWaker as TerminalWaker}; #[cfg(windows)] -pub use self::windows::WindowsTerminal; +pub use self::windows::{WindowsTerminal, WindowsTerminalWaker as TerminalWaker}; /// Represents the size of the terminal screen. /// The number of rows and columns of character cells are expressed. @@ -76,16 +77,20 @@ pub trait Terminal { fn flush(&mut self) -> Result<(), Error>; /// Check for a parsed input event. - /// `blocking` indicates the behavior in the case that no input is - /// immediately available. If blocking == `Blocking::Wait` then - /// `poll_input` will not return until an event is available. - /// If blocking == `Blocking:DoNotWait` then `poll_input` will return - /// immediately with a value of `Ok(None)`. + /// `wait` indicates the behavior in the case that no input is + /// immediately available. If wait is `None` then `poll_input` + /// will not return until an event is available. If wait is + /// `Some(duration)` then `poll_input` will wait up to the given + /// duration for an event before returning with a value of + /// `Ok(None)`. If wait is `Some(Duration::new(0, 0))` then + /// the poll is non-blocking. /// /// The possible values returned as `InputEvent`s depend on the - /// mode of the terminal. Most modes are not returned unless + /// mode of the terminal. Most values are not returned unless /// the terminal is set to raw mode. - fn poll_input(&mut self, blocking: Blocking) -> Result, Error>; + fn poll_input(&mut self, wait: Option) -> Result, Error>; + + fn waker(&self) -> TerminalWaker; } /// `SystemTerminal` is a concrete implementation of `Terminal`. diff --git a/termwiz/src/terminal/unix.rs b/termwiz/src/terminal/unix.rs index 9a8bc26f601..2a4eda4dc59 100644 --- a/termwiz/src/terminal/unix.rs +++ b/termwiz/src/terminal/unix.rs @@ -9,6 +9,8 @@ use std::mem; use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::UnixStream; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use termios::{ cfmakeraw, tcdrain, tcflush, tcsetattr, Termios, TCIFLUSH, TCIOFLUSH, TCOFLUSH, TCSADRAIN, TCSAFLUSH, TCSANOW, @@ -223,9 +225,11 @@ pub struct UnixTerminal { saved_termios: Termios, renderer: TerminfoRenderer, input_parser: InputParser, - input_queue: Option>, + input_queue: VecDeque, sigwinch_id: SigId, sigwinch_pipe: UnixStream, + wake_pipe: UnixStream, + wake_pipe_write: Arc>, caps: Capabilities, in_alternate_screen: bool, } @@ -250,11 +254,13 @@ impl UnixTerminal { let saved_termios = write.get_termios()?; let renderer = TerminfoRenderer::new(caps.clone()); let input_parser = InputParser::new(); - let input_queue = None; + let input_queue = VecDeque::new(); - let (sigwinch_pipe, pipe_write) = UnixStream::pair()?; - let sigwinch_id = signal_hook::pipe::register(libc::SIGWINCH, pipe_write)?; + let (sigwinch_pipe, sigwinch_pipe_write) = UnixStream::pair()?; + let sigwinch_id = signal_hook::pipe::register(libc::SIGWINCH, sigwinch_pipe_write)?; sigwinch_pipe.set_nonblocking(true)?; + let (wake_pipe, wake_pipe_write) = UnixStream::pair()?; + wake_pipe.set_nonblocking(true)?; read.set_blocking(Blocking::DoNotWait)?; @@ -268,6 +274,8 @@ impl UnixTerminal { input_queue, sigwinch_pipe, sigwinch_id, + wake_pipe, + wake_pipe_write: Arc::new(Mutex::new(wake_pipe_write)), in_alternate_screen: false, }) } @@ -304,6 +312,19 @@ impl UnixTerminal { } } +#[derive(Clone)] +pub struct UnixTerminalWaker { + pipe: Arc>, +} + +impl UnixTerminalWaker { + pub fn wake(&self) -> Result<(), IoError> { + let mut pipe = self.pipe.lock().unwrap(); + pipe.write(b"W")?; + Ok(()) + } +} + impl Terminal for UnixTerminal { fn set_raw_mode(&mut self) -> Result<(), Error> { let mut raw = self.write.get_termios()?; @@ -394,11 +415,9 @@ impl Terminal for UnixTerminal { .map_err(|e| format_err!("flush failed: {}", e)) } - fn poll_input(&mut self, blocking: Blocking) -> Result, Error> { - if let Some(ref mut queue) = self.input_queue { - if let Some(event) = queue.pop_front() { - return Ok(Some(event)); - } + fn poll_input(&mut self, wait: Option) -> Result, Error> { + if let Some(event) = self.input_queue.pop_front() { + return Ok(Some(event)); } // Some unfortunately verbose code here. In order to safely hook and process @@ -413,13 +432,18 @@ impl Terminal for UnixTerminal { // integrate. let mut pfd = [ + pollfd { + fd: self.sigwinch_pipe.as_raw_fd(), + events: POLLIN, + revents: 0, + }, pollfd { fd: self.read.fd.fd, events: POLLIN, revents: 0, }, pollfd { - fd: self.sigwinch_pipe.as_raw_fd(), + fd: self.wake_pipe.as_raw_fd(), events: POLLIN, revents: 0, }, @@ -429,11 +453,8 @@ impl Terminal for UnixTerminal { poll( pfd.as_mut_ptr(), pfd.len() as _, - if blocking == Blocking::DoNotWait { - 0 // Immediate - } else { - -1 // Infinite - }, + wait.map(|wait| wait.as_millis() as libc::c_int) + .unwrap_or(-1), ) }; if poll_result < 0 { @@ -452,40 +473,45 @@ impl Terminal for UnixTerminal { return Err(format_err!("poll(2) error: {}", err)); } - if pfd[1].revents != 0 { + if pfd[0].revents != 0 { // SIGWINCH received via our pipe? if let Some(resize) = self.caught_sigwinch()? { return Ok(Some(resize)); } } - if pfd[0].revents != 0 { + if pfd[1].revents != 0 { let mut buf = [0u8; 64]; match self.read.read(&mut buf) { Ok(n) => { - // A little bit of a dance with moving the queue out of self - // to appease the borrow checker. We'll need to be sure to - // move it back before we return! - let mut queue = match self.input_queue.take() { - Some(queue) => queue, - None => VecDeque::new(), - }; - self.input_parser - .parse(&buf[0..n], |evt| queue.push_back(evt), n == buf.len()); - let result = queue.pop_front(); - // Move the queue back into self before we leave this scope - self.input_queue = Some(queue); - Ok(result) + let input_queue = &mut self.input_queue; + self.input_parser.parse( + &buf[0..n], + |evt| input_queue.push_back(evt), + n == buf.len(), + ); + return Ok(self.input_queue.pop_front()); } Err(ref e) - if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::Interrupted => - { - Ok(None) - } - Err(e) => Err(format_err!("failed to read input {}", e)), + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::Interrupted => {} + Err(e) => return Err(format_err!("failed to read input {}", e)), } - } else { - Ok(None) + } + + if pfd[2].revents != 0 { + let mut buf = [0u8; 64]; + match self.wake_pipe.read(&mut buf) { + Ok(_) => return Ok(Some(InputEvent::Wake)), + Err(_) => {} + } + } + + Ok(None) + } + + fn waker(&self) -> UnixTerminalWaker { + UnixTerminalWaker { + pipe: self.wake_pipe_write.clone(), } } } diff --git a/termwiz/src/terminal/windows.rs b/termwiz/src/terminal/windows.rs index 73f0d3e64fd..efa028b0f16 100644 --- a/termwiz/src/terminal/windows.rs +++ b/termwiz/src/terminal/windows.rs @@ -5,11 +5,16 @@ use std::collections::VecDeque; use std::fs::OpenOptions; use std::io::{stdin, stdout, Error as IoError, Read, Result as IoResult, Write}; use std::os::windows::io::{AsRawHandle, RawHandle}; +use std::sync::Arc; +use std::time::Duration; use std::{mem, ptr}; +use winapi::shared::winerror::WAIT_TIMEOUT; use winapi::um::consoleapi; use winapi::um::fileapi::{ReadFile, WriteFile}; use winapi::um::handleapi::*; use winapi::um::processthreadsapi::GetCurrentProcess; +use winapi::um::synchapi::{CreateEventW, SetEvent, WaitForMultipleObjects}; +use winapi::um::winbase::{INFINITE, WAIT_FAILED, WAIT_OBJECT_0}; use winapi::um::wincon::{ FillConsoleOutputAttribute, FillConsoleOutputCharacterW, GetConsoleScreenBufferInfo, ScrollConsoleScreenBufferW, SetConsoleCursorPosition, SetConsoleScreenBufferSize, @@ -24,7 +29,7 @@ use crate::caps::Capabilities; use crate::input::{InputEvent, InputParser}; use crate::render::windows::WindowsConsoleRenderer; use crate::surface::Change; -use crate::terminal::{cast, Blocking, ScreenSize, Terminal}; +use crate::terminal::{cast, ScreenSize, Terminal}; const BUF_SIZE: usize = 128; @@ -174,6 +179,40 @@ impl OutputHandle { } } +struct EventHandle { + handle: RawHandle, +} + +impl EventHandle { + fn new() -> IoResult { + let handle = unsafe { CreateEventW(ptr::null_mut(), 0, 0, ptr::null_mut()) }; + if handle.is_null() { + Err(IoError::last_os_error()) + } else { + Ok(Self { + handle: handle as *mut _, + }) + } + } + + fn set(&self) -> IoResult<()> { + let ok = unsafe { SetEvent(self.handle as *mut _) }; + if ok == 0 { + Err(IoError::last_os_error()) + } else { + Ok(()) + } + } +} + +impl Drop for EventHandle { + fn drop(&mut self) { + unsafe { + CloseHandle(self.handle as *mut _); + } + } +} + fn do_write(handle: RawHandle, buf: &[u8]) -> IoResult { let mut num_wrote = 0; let ok = unsafe { @@ -375,6 +414,7 @@ impl ConsoleOutputHandle for OutputHandle { pub struct WindowsTerminal { input_handle: InputHandle, output_handle: OutputHandle, + waker_handle: Arc, saved_input_mode: u32, saved_output_mode: u32, renderer: WindowsConsoleRenderer, @@ -417,6 +457,7 @@ impl WindowsTerminal { let mut input_handle = InputHandle { handle: dup(read)? }; let mut output_handle = OutputHandle::new(dup(write)?); + let waker_handle = Arc::new(EventHandle::new()?); let saved_input_mode = input_handle.get_input_mode()?; let saved_output_mode = output_handle.get_output_mode()?; @@ -426,6 +467,7 @@ impl WindowsTerminal { Ok(Self { input_handle, output_handle, + waker_handle, saved_input_mode, saved_output_mode, renderer, @@ -456,6 +498,18 @@ impl WindowsTerminal { } } +#[derive(Clone)] +pub struct WindowsTerminalWaker { + handle: Arc, +} + +impl WindowsTerminalWaker { + pub fn wake(&mut self) -> IoResult<()> { + self.handle.set()?; + Ok(()) + } +} + impl Terminal for WindowsTerminal { fn set_raw_mode(&mut self) -> Result<(), Error> { let mode = self.input_handle.get_input_mode()?; @@ -525,17 +579,42 @@ impl Terminal for WindowsTerminal { .map_err(|e| format_err!("flush failed: {}", e)) } - fn poll_input(&mut self, blocking: Blocking) -> Result, Error> { + fn poll_input(&mut self, wait: Option) -> Result, Error> { loop { if let Some(event) = self.input_queue.pop_front() { return Ok(Some(event)); } - let pending = match (self.input_handle.get_number_of_input_events()?, blocking) { - (0, Blocking::DoNotWait) => return Ok(None), - (0, Blocking::Wait) => 1, - (pending, _) => pending, - }; + let mut pending = self.input_handle.get_number_of_input_events()?; + + if pending == 0 { + let mut handles = [ + self.input_handle.handle as *mut _, + self.waker_handle.handle as *mut _, + ]; + let result = unsafe { + WaitForMultipleObjects( + 2, + handles.as_mut_ptr(), + 0, + wait.map(|wait| wait.as_millis() as u32).unwrap_or(INFINITE), + ) + }; + if result == WAIT_OBJECT_0 + 0 { + pending = 1; + } else if result == WAIT_OBJECT_0 + 1 { + return Ok(Some(InputEvent::Wake)); + } else if result == WAIT_FAILED { + bail!( + "failed to WaitForMultipleObjects: {}", + IoError::last_os_error() + ); + } else if result == WAIT_TIMEOUT { + return Ok(None); + } else { + return Ok(None); + } + } let records = self.input_handle.read_console_input(pending)?; @@ -544,4 +623,10 @@ impl Terminal for WindowsTerminal { .decode_input_records(&records, &mut |evt| input_queue.push_back(evt)); } } + + fn waker(&self) -> WindowsTerminalWaker { + WindowsTerminalWaker { + handle: self.waker_handle.clone(), + } + } } diff --git a/termwiz/src/widgets/mod.rs b/termwiz/src/widgets/mod.rs index 9b54e2bd039..600c13033ed 100644 --- a/termwiz/src/widgets/mod.rs +++ b/termwiz/src/widgets/mod.rs @@ -238,7 +238,8 @@ impl<'widget> Ui<'widget> { self.do_deliver(id, &WidgetEvent::Input(InputEvent::Mouse(m))) } WidgetEvent::Input(InputEvent::Paste(_)) - | WidgetEvent::Input(InputEvent::Key(_)) => self.do_deliver(id, event), + | WidgetEvent::Input(InputEvent::Key(_)) + | WidgetEvent::Input(InputEvent::Wake) => self.do_deliver(id, event), }; if handled { @@ -316,7 +317,8 @@ impl<'widget> Ui<'widget> { } } WidgetEvent::Input(InputEvent::Key(_)) - | WidgetEvent::Input(InputEvent::Paste(_)) => { + | WidgetEvent::Input(InputEvent::Paste(_)) + | WidgetEvent::Input(InputEvent::Wake) => { if let Some(focus) = self.focused { self.deliver_event(focus, &event); }