Skip to content

Commit

Permalink
Make windows terminal keep up with real time (#11905)
Browse files Browse the repository at this point in the history
Use a buffered channel pipe to increase the speed of windows stdin closer to real-time. The speed increase makes it so that VT sequences are captured properly by windows.
  • Loading branch information
Joerger authored Jun 3, 2022
1 parent 6a693b9 commit f23a29f
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 77 deletions.
25 changes: 6 additions & 19 deletions lib/client/terminal/terminal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func New(stdin io.Reader, stdout, stderr io.Writer) (*Terminal, error) {
return &term, nil
}

// InitRaw puts the terminal into raw output mode. If `input` set set, it also
// InitRaw puts the terminal into raw output mode. If `input` is set, it also
// begins capturing raw input events from the Windows API, asynchronously
// writing them to a Pipe emulating a traditional Unix stdin.
// Note that some implementations may replace one or more streams (particularly
Expand Down Expand Up @@ -165,36 +165,23 @@ func (t *Terminal) InitRaw(input bool) error {
cleanup()
}()

// Convert input events into a usable io.Reader.
pipeRead, pipeWrite := io.Pipe()
// emit resize events
t.closeWait.Add(1)
go func() {
defer t.closeWait.Done()

events := tncon.Subscribe()
ch := tncon.SubcribeResizeEvents()
for {
select {
case event := <-events:
switch e := event.(type) {
case tncon.SequenceEvent:
if len(e.Sequence) > 0 {
_, err := pipeWrite.Write(e.Sequence)
if err != nil {
log.Errorf("failed to write input sequence: %+v", err)
_ = t.closer.Close()
return
}
}
case tncon.ResizeEvent:
t.writeEvent(ResizeEvent{})
}
case <-ch:
t.writeEvent(ResizeEvent{})
case <-t.closer.C:
return
}
}
}()

t.stdin = pipeRead
t.stdin = tncon.SequenceReader()
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions lib/client/tncon/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2020 Leonid Titov. All rights reserved.
// MIT licence.
// Version 2020-12-23

package tncon

import (
"io"
)

// bufferedChannelPipe is a synchronous buffered pipe implemented with a channel. This pipe
// is much more efficient than the standard io.Pipe, and can keep up with real-time
// shell output, which is needed for the lib/client/tncon implementation.
//
// Derived from https://github.com/latitov/milkthisbuffer/blob/main/milkthisbuffer.go
type bufferedChannelPipe struct {
ch chan byte
closed chan struct{}
}

func newBufferedChannelPipe(len int) *bufferedChannelPipe {
return &bufferedChannelPipe{
ch: make(chan byte, len),
closed: make(chan struct{}),
}
}

// Write will write all of p to the buffer unless the buffer is closed
func (b *bufferedChannelPipe) Write(p []byte) (n int, err error) {
for n = 0; n < len(p); n++ {
select {
// blocking behaviour
case b.ch <- p[n]:
case <-b.closed:
return n, io.EOF
}
}
return n, nil
}

// Read will always read at least one byte from the buffer unless the buffer is closed
func (b *bufferedChannelPipe) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

// blocking behaviour
select {
case p[0] = <-b.ch:
case <-b.closed:
return 0, io.EOF
}

for n = 1; n < len(p); n++ {
select {
case p[n] = <-b.ch:
case <-b.closed:
return n, io.EOF
default:
return n, nil
}
}
return n, nil
}

func (b *bufferedChannelPipe) Close() error {
close(b.closed)
return nil
}
176 changes: 176 additions & 0 deletions lib/client/tncon/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tncon

import (
"fmt"
"io"
"testing"

"github.com/stretchr/testify/require"
)

func TestBufferedChannelPipeClose(t *testing.T) {
buffer := newBufferedChannelPipe(0)
require.NoError(t, buffer.Close())

// Reading from a closed channel should return EOF
n, err := buffer.Read(make([]byte, 1))
require.Equal(t, 0, n)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)

// Reading from a closed channel should return ErrClosedPipe
n, err = buffer.Write(make([]byte, 1))
require.Equal(t, 0, n)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)
}

func TestBufferedChannelPipeWrite(t *testing.T) {
// With a sufficient buffer, write should successfully
// write to the channel without blocking
for _, tc := range []struct {
buffer int
len int
}{
{
buffer: 0,
len: 0,
}, {
buffer: 0,
len: 10,
}, {
buffer: 10,
len: 10,
}, {
buffer: 10,
len: 100,
}, {
buffer: 100,
len: 100,
},
} {
t.Run(fmt.Sprintf("buffer=%v, len=%v", tc.buffer, tc.len), func(t *testing.T) {
buffer := newBufferedChannelPipe(tc.buffer)
t.Cleanup(func() { require.NoError(t, buffer.Close()) })

// drain channel
rc := make(chan []byte)
go func() {
read := make([]byte, tc.len)
for i := range read {
read[i] = <-buffer.ch
}
rc <- read
}()

p := make([]byte, tc.len)
for n := 0; n < tc.len; n++ {
p[n] = byte(n)
}

n, err := buffer.Write(p)
require.NoError(t, err)
require.Equal(t, tc.len, n)
require.Equal(t, p, <-rc)
})
}
}

func TestBufferedChannelPipeRead(t *testing.T) {
for _, tc := range []struct {
desc string
buffer int
writeLen int
readLen int
expectN int
}{
{
desc: "empty read",
buffer: 0,
writeLen: 0,
readLen: 0,
expectN: 0,
}, {
desc: "one byte read",
buffer: 0,
writeLen: 1,
readLen: 1,
expectN: 1,
}, {
desc: "read with equal buffer",
buffer: 10,
writeLen: 10,
readLen: 10,
expectN: 10,
}, {
desc: "large read with large buffer",
buffer: 100,
writeLen: 10000,
readLen: 10000,
expectN: 10000,
},
} {
t.Run(tc.desc, func(t *testing.T) {
buffer := newBufferedChannelPipe(tc.buffer)
t.Cleanup(func() { require.NoError(t, buffer.Close()) })

write := make([]byte, tc.writeLen)
for i := 0; i < tc.writeLen; i++ {
write[i] = byte(i)
}

// fill channel
go buffer.Write(write)

p := make([]byte, tc.readLen)
n, err := io.ReadFull(buffer, p)
require.NoError(t, err)
require.Equal(t, tc.expectN, n)
require.Equal(t, write[:n], p[:n])
})
}
}

func BenchmarkBufferedChannelPipe(b *testing.B) {
for _, s := range []int{1, 10, 100, 500, 1000} {
data := make([]byte, 1000)
b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) {
b.StopTimer() // stop timer during setup
buffer := newBufferedChannelPipe(s)
b.Cleanup(func() { require.NoError(b, buffer.Close()) })

errCh := make(chan error)
go func() {
readBuffer := make([]byte, b.N*len(data))
_, err := io.ReadFull(buffer, readBuffer)
errCh <- err
}()

// benchmark write+read
b.StartTimer()
for n := 0; n < b.N; n++ {
written, err := buffer.Write(data)
require.NoError(b, err)
require.Equal(b, len(data), written)
}
require.NoError(b, <-errCh)
b.StopTimer()
})
}
}
4 changes: 2 additions & 2 deletions lib/client/tncon/tncon.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ WriteToBuffer(char* source, size_t len)
{
// NOTE: Modified to emit an event to the Go lib rather than mutate a
// global buffer.
writeSequenceEvent(source, len);
writeSequence(source, len);

return len;
}
Expand Down Expand Up @@ -99,7 +99,7 @@ ReadConsoleForTermEmul(HANDLE hInput, HANDLE hQuitEvent)

// NOTE: modified here to emit events directly
case WINDOW_BUFFER_SIZE_EVENT:
writeResizeEvent(inputRecord.Event.WindowBufferSizeEvent.dwSize);
notifyResizeEvent();
break;
case FOCUS_EVENT:
break;
Expand Down
Loading

0 comments on commit f23a29f

Please sign in to comment.