From f23a29ff2f33bf3b346c7ba7dfe8e44c25df3553 Mon Sep 17 00:00:00 2001 From: Brian Joerger Date: Fri, 3 Jun 2022 13:09:02 -0700 Subject: [PATCH] Make windows terminal keep up with real time (#11905) Use a buffered channel pipe to increase the speed of windows stdin closer to real-time. The speed increase makes it so that VT sequences are captured properly by windows. --- lib/client/terminal/terminal_windows.go | 25 +--- lib/client/tncon/buffer.go | 69 ++++++++++ lib/client/tncon/buffer_test.go | 176 ++++++++++++++++++++++++ lib/client/tncon/tncon.c | 4 +- lib/client/tncon/tncon.go | 103 +++++++------- 5 files changed, 300 insertions(+), 77 deletions(-) create mode 100644 lib/client/tncon/buffer.go create mode 100644 lib/client/tncon/buffer_test.go diff --git a/lib/client/terminal/terminal_windows.go b/lib/client/terminal/terminal_windows.go index b167fff263eb6..488a85cd1cdb2 100644 --- a/lib/client/terminal/terminal_windows.go +++ b/lib/client/terminal/terminal_windows.go @@ -137,7 +137,7 @@ func New(stdin io.Reader, stdout, stderr io.Writer) (*Terminal, error) { return &term, nil } -// InitRaw puts the terminal into raw output mode. If `input` set set, it also +// InitRaw puts the terminal into raw output mode. If `input` is set, it also // begins capturing raw input events from the Windows API, asynchronously // writing them to a Pipe emulating a traditional Unix stdin. // Note that some implementations may replace one or more streams (particularly @@ -165,36 +165,23 @@ func (t *Terminal) InitRaw(input bool) error { cleanup() }() - // Convert input events into a usable io.Reader. - pipeRead, pipeWrite := io.Pipe() + // emit resize events t.closeWait.Add(1) go func() { defer t.closeWait.Done() - events := tncon.Subscribe() + ch := tncon.SubcribeResizeEvents() for { select { - case event := <-events: - switch e := event.(type) { - case tncon.SequenceEvent: - if len(e.Sequence) > 0 { - _, err := pipeWrite.Write(e.Sequence) - if err != nil { - log.Errorf("failed to write input sequence: %+v", err) - _ = t.closer.Close() - return - } - } - case tncon.ResizeEvent: - t.writeEvent(ResizeEvent{}) - } + case <-ch: + t.writeEvent(ResizeEvent{}) case <-t.closer.C: return } } }() - t.stdin = pipeRead + t.stdin = tncon.SequenceReader() return nil } diff --git a/lib/client/tncon/buffer.go b/lib/client/tncon/buffer.go new file mode 100644 index 0000000000000..f7edbcc80251c --- /dev/null +++ b/lib/client/tncon/buffer.go @@ -0,0 +1,69 @@ +// Copyright (c) 2020 Leonid Titov. All rights reserved. +// MIT licence. +// Version 2020-12-23 + +package tncon + +import ( + "io" +) + +// bufferedChannelPipe is a synchronous buffered pipe implemented with a channel. This pipe +// is much more efficient than the standard io.Pipe, and can keep up with real-time +// shell output, which is needed for the lib/client/tncon implementation. +// +// Derived from https://github.com/latitov/milkthisbuffer/blob/main/milkthisbuffer.go +type bufferedChannelPipe struct { + ch chan byte + closed chan struct{} +} + +func newBufferedChannelPipe(len int) *bufferedChannelPipe { + return &bufferedChannelPipe{ + ch: make(chan byte, len), + closed: make(chan struct{}), + } +} + +// Write will write all of p to the buffer unless the buffer is closed +func (b *bufferedChannelPipe) Write(p []byte) (n int, err error) { + for n = 0; n < len(p); n++ { + select { + // blocking behaviour + case b.ch <- p[n]: + case <-b.closed: + return n, io.EOF + } + } + return n, nil +} + +// Read will always read at least one byte from the buffer unless the buffer is closed +func (b *bufferedChannelPipe) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + // blocking behaviour + select { + case p[0] = <-b.ch: + case <-b.closed: + return 0, io.EOF + } + + for n = 1; n < len(p); n++ { + select { + case p[n] = <-b.ch: + case <-b.closed: + return n, io.EOF + default: + return n, nil + } + } + return n, nil +} + +func (b *bufferedChannelPipe) Close() error { + close(b.closed) + return nil +} diff --git a/lib/client/tncon/buffer_test.go b/lib/client/tncon/buffer_test.go new file mode 100644 index 0000000000000..989353e164caf --- /dev/null +++ b/lib/client/tncon/buffer_test.go @@ -0,0 +1,176 @@ +/* +Copyright 2022 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tncon + +import ( + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBufferedChannelPipeClose(t *testing.T) { + buffer := newBufferedChannelPipe(0) + require.NoError(t, buffer.Close()) + + // Reading from a closed channel should return EOF + n, err := buffer.Read(make([]byte, 1)) + require.Equal(t, 0, n) + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) + + // Reading from a closed channel should return ErrClosedPipe + n, err = buffer.Write(make([]byte, 1)) + require.Equal(t, 0, n) + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) +} + +func TestBufferedChannelPipeWrite(t *testing.T) { + // With a sufficient buffer, write should successfully + // write to the channel without blocking + for _, tc := range []struct { + buffer int + len int + }{ + { + buffer: 0, + len: 0, + }, { + buffer: 0, + len: 10, + }, { + buffer: 10, + len: 10, + }, { + buffer: 10, + len: 100, + }, { + buffer: 100, + len: 100, + }, + } { + t.Run(fmt.Sprintf("buffer=%v, len=%v", tc.buffer, tc.len), func(t *testing.T) { + buffer := newBufferedChannelPipe(tc.buffer) + t.Cleanup(func() { require.NoError(t, buffer.Close()) }) + + // drain channel + rc := make(chan []byte) + go func() { + read := make([]byte, tc.len) + for i := range read { + read[i] = <-buffer.ch + } + rc <- read + }() + + p := make([]byte, tc.len) + for n := 0; n < tc.len; n++ { + p[n] = byte(n) + } + + n, err := buffer.Write(p) + require.NoError(t, err) + require.Equal(t, tc.len, n) + require.Equal(t, p, <-rc) + }) + } +} + +func TestBufferedChannelPipeRead(t *testing.T) { + for _, tc := range []struct { + desc string + buffer int + writeLen int + readLen int + expectN int + }{ + { + desc: "empty read", + buffer: 0, + writeLen: 0, + readLen: 0, + expectN: 0, + }, { + desc: "one byte read", + buffer: 0, + writeLen: 1, + readLen: 1, + expectN: 1, + }, { + desc: "read with equal buffer", + buffer: 10, + writeLen: 10, + readLen: 10, + expectN: 10, + }, { + desc: "large read with large buffer", + buffer: 100, + writeLen: 10000, + readLen: 10000, + expectN: 10000, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + buffer := newBufferedChannelPipe(tc.buffer) + t.Cleanup(func() { require.NoError(t, buffer.Close()) }) + + write := make([]byte, tc.writeLen) + for i := 0; i < tc.writeLen; i++ { + write[i] = byte(i) + } + + // fill channel + go buffer.Write(write) + + p := make([]byte, tc.readLen) + n, err := io.ReadFull(buffer, p) + require.NoError(t, err) + require.Equal(t, tc.expectN, n) + require.Equal(t, write[:n], p[:n]) + }) + } +} + +func BenchmarkBufferedChannelPipe(b *testing.B) { + for _, s := range []int{1, 10, 100, 500, 1000} { + data := make([]byte, 1000) + b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) { + b.StopTimer() // stop timer during setup + buffer := newBufferedChannelPipe(s) + b.Cleanup(func() { require.NoError(b, buffer.Close()) }) + + errCh := make(chan error) + go func() { + readBuffer := make([]byte, b.N*len(data)) + _, err := io.ReadFull(buffer, readBuffer) + errCh <- err + }() + + // benchmark write+read + b.StartTimer() + for n := 0; n < b.N; n++ { + written, err := buffer.Write(data) + require.NoError(b, err) + require.Equal(b, len(data), written) + } + require.NoError(b, <-errCh) + b.StopTimer() + }) + } +} diff --git a/lib/client/tncon/tncon.c b/lib/client/tncon/tncon.c index d4b7bb302b67d..045de71f2e0fe 100644 --- a/lib/client/tncon/tncon.c +++ b/lib/client/tncon/tncon.c @@ -54,7 +54,7 @@ WriteToBuffer(char* source, size_t len) { // NOTE: Modified to emit an event to the Go lib rather than mutate a // global buffer. - writeSequenceEvent(source, len); + writeSequence(source, len); return len; } @@ -99,7 +99,7 @@ ReadConsoleForTermEmul(HANDLE hInput, HANDLE hQuitEvent) // NOTE: modified here to emit events directly case WINDOW_BUFFER_SIZE_EVENT: - writeResizeEvent(inputRecord.Event.WindowBufferSizeEvent.dwSize); + notifyResizeEvent(); break; case FOCUS_EVENT: break; diff --git a/lib/client/tncon/tncon.go b/lib/client/tncon/tncon.go index 1a691379a7da2..b67953b74494f 100644 --- a/lib/client/tncon/tncon.go +++ b/lib/client/tncon/tncon.go @@ -29,78 +29,58 @@ import "C" import ( "fmt" + "io" "sync" "unsafe" + + "github.com/gravitational/trace" ) +// A buffer of 100 should provide ample buffer to hold several VT +// sequences (which are 5 bytes each max) and output them to the +// terminal in real time. +const sequenceBufferSize = 100 + var ( - subscribers []chan interface{} - subscribersMutex sync.Mutex + sequenceBuffer *bufferedChannelPipe + + resizeEventSubscribers []chan struct{} + resizeEventSubscribersMutex sync.Mutex running bool = false runningMutex sync.Mutex runningQuitHandle C.HANDLE ) -// SequenceEvent is emitted when one or more key sequences are generated. This -// implementation generally produces many 1-byte events rather than one event -// per keystroke unless VT sequence translation is enabled. -type SequenceEvent struct { - Sequence []byte +func SequenceReader() io.Reader { + return sequenceBuffer } -// ResizeEvent is emitted when the window size has been modified. The semantics -// of this event may vary depending on the current terminal and its flags: -// - `cmd.exe` tends not to emit vertical resize events, and horizontal events -// have nonsensical height (`Y`) values. -// - `powershell.exe` emits events reliably, but height values are still -// insane. -// - The new Windows Terminal app emits sane events for both horizontal and -// vertical resize inputs. -type ResizeEvent struct { - X int16 - - // y is the resized height. Note that depending on console mode, this - // number may not be sensible and events may not trigger on vertical - // resize. - Y int16 +//export writeSequence +func writeSequence(addr *C.char, len C.int) { + bytes := C.GoBytes(unsafe.Pointer(addr), len) + sequenceBuffer.Write(bytes) } -// writeEvent dispatches an event to all listeners. -func writeEvent(event interface{}) { - subscribersMutex.Lock() - defer subscribersMutex.Unlock() +// SubcribeResizeEvents creates a new channel from which to receive console input events. +func SubcribeResizeEvents() chan struct{} { + resizeEventSubscribersMutex.Lock() + defer resizeEventSubscribersMutex.Unlock() - for _, sub := range subscribers { - sub <- event - } -} - -//export writeSequenceEvent -func writeSequenceEvent(addr *C.char, len C.int) { - bytes := C.GoBytes(unsafe.Pointer(addr), len) - writeEvent(SequenceEvent{ - Sequence: bytes, - }) -} + ch := make(chan struct{}) + resizeEventSubscribers = append(resizeEventSubscribers, ch) -//export writeResizeEvent -func writeResizeEvent(size C.COORD) { - writeEvent(ResizeEvent{ - X: int16(size.X), - Y: int16(size.Y), - }) + return ch } -// Subscribe creates a new channel from which to receive console input events. -func Subscribe() chan interface{} { - subscribersMutex.Lock() - defer subscribersMutex.Unlock() +//export notifyResizeEvent +func notifyResizeEvent() { + resizeEventSubscribersMutex.Lock() + defer resizeEventSubscribersMutex.Unlock() - ch := make(chan interface{}) - subscribers = append(subscribers, ch) - - return ch + for _, sub := range resizeEventSubscribers { + sub <- struct{}{} + } } // readInputContinuous is a blocking call that continuously reads console @@ -109,15 +89,20 @@ func Subscribe() chan interface{} { func readInputContinuous(quitHandle C.HANDLE) error { C.ReadInputContinuous(quitHandle) + // Close the sequenceBuffer (terminal stdin) + if err := sequenceBuffer.Close(); err != nil { + return trace.Wrap(err) + } + // Once finished, close all existing subscriber channels to notify them // of the close (they can resubscribe if it's ever restarted). - subscribersMutex.Lock() - defer subscribersMutex.Unlock() + resizeEventSubscribersMutex.Lock() + defer resizeEventSubscribersMutex.Unlock() - for _, ch := range subscribers { + for _, ch := range resizeEventSubscribers { close(ch) } - subscribers = subscribers[:0] + resizeEventSubscribers = resizeEventSubscribers[:0] runningMutex.Lock() defer runningMutex.Unlock() @@ -154,6 +139,12 @@ func Start() error { running = true runningQuitHandle = C.CreateEventA(nil, C.TRUE, C.FALSE, nil) + // Adding a buffer increases the speed of reads by a great amount, + // since waiting on channel sends is the main chokepoint. Without + // a sufficient buffer, the individual keystrokes won't be transmitted + // quickly enough for them to be grouped as a VT sequence by Windows. + sequenceBuffer = newBufferedChannelPipe(sequenceBufferSize) + go readInputContinuous(runningQuitHandle) return nil