Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make windows terminal keep up with real time #11905

Merged
merged 13 commits into from
Jun 3, 2022
Merged
25 changes: 6 additions & 19 deletions lib/client/terminal/terminal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func New(stdin io.Reader, stdout, stderr io.Writer) (*Terminal, error) {
return &term, nil
}

// InitRaw puts the terminal into raw output mode. If `input` set set, it also
// InitRaw puts the terminal into raw output mode. If `input` is set, it also
// begins capturing raw input events from the Windows API, asynchronously
// writing them to a Pipe emulating a traditional Unix stdin.
// Note that some implementations may replace one or more streams (particularly
Expand Down Expand Up @@ -165,36 +165,23 @@ func (t *Terminal) InitRaw(input bool) error {
cleanup()
}()

// Convert input events into a usable io.Reader.
pipeRead, pipeWrite := io.Pipe()
// emit resize events
t.closeWait.Add(1)
go func() {
defer t.closeWait.Done()

events := tncon.Subscribe()
ch := tncon.SubcribeResizeEvents()
for {
select {
case event := <-events:
switch e := event.(type) {
case tncon.SequenceEvent:
if len(e.Sequence) > 0 {
_, err := pipeWrite.Write(e.Sequence)
if err != nil {
log.Errorf("failed to write input sequence: %+v", err)
_ = t.closer.Close()
return
}
}
case tncon.ResizeEvent:
t.writeEvent(ResizeEvent{})
}
case <-ch:
t.writeEvent(ResizeEvent{})
case <-t.closer.C:
return
}
}
}()

t.stdin = pipeRead
t.stdin = tncon.SequenceReader()
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions lib/client/tncon/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2020 Leonid Titov. All rights reserved.
// MIT licence.
// Version 2020-12-23

package tncon

import (
"io"
)

// bufferedChannelPipe is a synchronous buffered pipe implemented with a channel. This pipe
// is much more efficient than the standard io.Pipe, and can keep up with real-time
// shell output, which is needed for the lib/client/tncon implementation.
//
// Derived from https://github.com/latitov/milkthisbuffer/blob/main/milkthisbuffer.go
type bufferedChannelPipe struct {
ch chan byte
closed chan struct{}
}

func newBufferedChannelPipe(len int) *bufferedChannelPipe {
return &bufferedChannelPipe{
ch: make(chan byte, len),
closed: make(chan struct{}),
}
}

// Write will write all of p to the buffer unless the buffer is closed
func (b *bufferedChannelPipe) Write(p []byte) (n int, err error) {
for n = 0; n < len(p); n++ {
select {
// blocking behaviour
case b.ch <- p[n]:
case <-b.closed:
return n, io.EOF
}
}
return n, nil
}

// Read will always read at least one byte from the buffer unless the buffer is closed
func (b *bufferedChannelPipe) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

// blocking behaviour
select {
case p[0] = <-b.ch:
case <-b.closed:
return 0, io.EOF
}

for n = 1; n < len(p); n++ {
select {
case p[n] = <-b.ch:
case <-b.closed:
return n, io.EOF
default:
return n, nil
}
}
return n, nil
}

func (b *bufferedChannelPipe) Close() error {
close(b.closed)
return nil
}
176 changes: 176 additions & 0 deletions lib/client/tncon/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
Copyright 2022 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tncon

import (
"fmt"
"io"
"testing"

"github.com/stretchr/testify/require"
)

Joerger marked this conversation as resolved.
Show resolved Hide resolved
func TestBufferedChannelPipeClose(t *testing.T) {
buffer := newBufferedChannelPipe(0)
require.NoError(t, buffer.Close())

// Reading from a closed channel should return EOF
n, err := buffer.Read(make([]byte, 1))
require.Equal(t, 0, n)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)

// Reading from a closed channel should return ErrClosedPipe
n, err = buffer.Write(make([]byte, 1))
require.Equal(t, 0, n)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)
}

func TestBufferedChannelPipeWrite(t *testing.T) {
// With a sufficient buffer, write should successfully
// write to the channel without blocking
for _, tc := range []struct {
buffer int
len int
}{
{
buffer: 0,
len: 0,
}, {
buffer: 0,
len: 10,
}, {
buffer: 10,
len: 10,
}, {
buffer: 10,
len: 100,
}, {
buffer: 100,
len: 100,
},
} {
t.Run(fmt.Sprintf("buffer=%v, len=%v", tc.buffer, tc.len), func(t *testing.T) {
buffer := newBufferedChannelPipe(tc.buffer)
t.Cleanup(func() { require.NoError(t, buffer.Close()) })

// drain channel
rc := make(chan []byte)
go func() {
read := make([]byte, tc.len)
for i := range read {
read[i] = <-buffer.ch
}
rc <- read
}()

p := make([]byte, tc.len)
for n := 0; n < tc.len; n++ {
p[n] = byte(n)
}

n, err := buffer.Write(p)
require.NoError(t, err)
require.Equal(t, tc.len, n)
require.Equal(t, p, <-rc)
})
}
}

func TestBufferedChannelPipeRead(t *testing.T) {
for _, tc := range []struct {
desc string
buffer int
writeLen int
readLen int
expectN int
}{
{
desc: "empty read",
buffer: 0,
writeLen: 0,
readLen: 0,
expectN: 0,
}, {
desc: "one byte read",
buffer: 0,
writeLen: 1,
readLen: 1,
expectN: 1,
}, {
desc: "read with equal buffer",
buffer: 10,
writeLen: 10,
readLen: 10,
expectN: 10,
}, {
desc: "large read with large buffer",
buffer: 100,
writeLen: 10000,
readLen: 10000,
expectN: 10000,
},
} {
t.Run(tc.desc, func(t *testing.T) {
buffer := newBufferedChannelPipe(tc.buffer)
t.Cleanup(func() { require.NoError(t, buffer.Close()) })

write := make([]byte, tc.writeLen)
for i := 0; i < tc.writeLen; i++ {
write[i] = byte(i)
}

// fill channel
go buffer.Write(write)

p := make([]byte, tc.readLen)
n, err := io.ReadFull(buffer, p)
require.NoError(t, err)
require.Equal(t, tc.expectN, n)
require.Equal(t, write[:n], p[:n])
})
}
}

func BenchmarkBufferedChannelPipe(b *testing.B) {
Joerger marked this conversation as resolved.
Show resolved Hide resolved
for _, s := range []int{1, 10, 100, 500, 1000} {
data := make([]byte, 1000)
b.Run(fmt.Sprintf("size=%d", s), func(b *testing.B) {
b.StopTimer() // stop timer during setup
buffer := newBufferedChannelPipe(s)
b.Cleanup(func() { require.NoError(b, buffer.Close()) })

errCh := make(chan error)
go func() {
readBuffer := make([]byte, b.N*len(data))
_, err := io.ReadFull(buffer, readBuffer)
errCh <- err
}()

// benchmark write+read
b.StartTimer()
for n := 0; n < b.N; n++ {
written, err := buffer.Write(data)
require.NoError(b, err)
require.Equal(b, len(data), written)
}
require.NoError(b, <-errCh)
b.StopTimer()
})
}
}
4 changes: 2 additions & 2 deletions lib/client/tncon/tncon.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ WriteToBuffer(char* source, size_t len)
{
// NOTE: Modified to emit an event to the Go lib rather than mutate a
// global buffer.
writeSequenceEvent(source, len);
writeSequence(source, len);

return len;
}
Expand Down Expand Up @@ -99,7 +99,7 @@ ReadConsoleForTermEmul(HANDLE hInput, HANDLE hQuitEvent)

// NOTE: modified here to emit events directly
case WINDOW_BUFFER_SIZE_EVENT:
writeResizeEvent(inputRecord.Event.WindowBufferSizeEvent.dwSize);
notifyResizeEvent();
break;
case FOCUS_EVENT:
break;
Expand Down
Loading