Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Implement proper terminal emulation #1

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/alessio/shellescape v1.4.2
github.com/creack/pty v1.1.18
github.com/danielgatis/go-vte v1.0.6
github.com/fatih/color v1.15.0
github.com/mattn/go-isatty v0.0.19
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
Expand All @@ -14,6 +15,7 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sys v0.12.0
golang.org/x/term v0.12.0
modernc.org/mathutil v1.6.0
modernc.org/memory v1.7.1
)

Expand All @@ -22,6 +24,7 @@ require (
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4u
github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/danielgatis/go-vte v1.0.6 h1:jSEGG0c3Ax8epsIKLWSeUjY4qgXxmKlBCghxK81YRrY=
github.com/danielgatis/go-vte v1.0.6/go.mod h1:B3jdQWPpcskSbJTMTqb+slToseRGyT03pvjb7ILiYJE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -31,6 +33,7 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:Om
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/shirou/gopsutil/v3 v3.23.8 h1:xnATPiybo6GgdRoC4YoGnxXZFRc3dqQTGi73oLvvBrE=
github.com/shirou/gopsutil/v3 v3.23.8/go.mod h1:7hmCaBn+2ZwaZOr6jmPBZDfawwMGuo1id3C6aM8EDqQ=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
Expand Down Expand Up @@ -72,5 +75,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
modernc.org/memory v1.7.1 h1:9J+2/GKTlV503mk3yv8QJ6oEpRCUrRy0ad8TXEPoV8M=
modernc.org/memory v1.7.1/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
10 changes: 9 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,15 @@ func writeOut(out *Output) {

func toForeground(proc *ProcessResult) (exitCode int) {
proc.output.partsMutex.Lock()

proc.output.shouldPassToParent.value = true
proc.output.shouldPassToParent.becameTrue <- struct{}{}
if !stdoutAndStderrAreTheSame() {
proc.output.shouldPassToParent.becameTrue <- struct{}{}
}

writeOut(proc.output)
proc.output.shouldPassToParent = true

proc.output.partsMutex.Unlock()

return <-proc.exitCode // block until the process exits
Expand Down Expand Up @@ -295,6 +302,7 @@ func main() {
createLimitServer()
}

// Use chann.New() instead of make(chan *ProcessResult) to have a channel with an unbounded dynamically-sized buffer
processes := chann.New[*ProcessResult]()
go func() {
defer processes.Close()
Expand Down
6 changes: 2 additions & 4 deletions recursivetasklimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func createLimitServer() {

// if we've previously crashed (or exited unexpectedly) there could be an old socket file
// left over at the same location if PID rollover happens. Let's try to remove it then to
// be safe if case it exists
// be safe in case it exists
_ = os.Remove(listenPath)

mustSetenv(EnvGparallelChildLimitSocket, listenPath)
Expand Down Expand Up @@ -142,12 +142,10 @@ var recursiveTaskLimitClient = onceValue(func() (client struct {
}

mutex.Unlock()
ch := make(chan error)
go func() { ch <- readOneByte(conn) }()
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-ch:
case err = <-toChannel(func() error { return readOneByte(conn) }):
}
mutex.Lock()

Expand Down
190 changes: 154 additions & 36 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,34 @@ import (

"github.com/alessio/shellescape"
ptyPkg "github.com/creack/pty"
"github.com/karolba/gparallel/terminalscreen"
"github.com/shirou/gopsutil/v3/process"
"golang.org/x/exp/slices"
"golang.org/x/sys/unix"
)

const MAXBUF = 32 * 1024
const MAXBUF = 16 * 1024

type Output struct {
parts []byte
partsMutex sync.Mutex
shouldPassToParent bool
stdoutPipeOrPty *os.File
stderrPipeOrPty *os.File
winchSignal chan os.Signal
streamClosed chan struct{}
allocator chunkAllocator
shouldPassToParent struct {
value bool
becameTrue chan struct{}
}
stdoutPipeOrPty *os.File
stderrPipeOrPty *os.File
stdoutVirtualScreen *terminalscreen.Screen
stderrVirtualScreen *terminalscreen.Screen
winchSignal chan os.Signal
streamClosed chan struct{}
allocator chunkAllocator
}

func NewOutput() *Output {
o := &Output{}
o.shouldPassToParent.becameTrue = make(chan struct{}, 2)
return o
}

type ProcessResult struct {
Expand Down Expand Up @@ -70,11 +82,29 @@ func (proc *ProcessResult) wait() error {
return proc.cmd.Wait()
}

func (out *Output) appendOrWrite(buf []byte, dataFromFd int) {
func (out *Output) appendOrWrite(buf []byte, dataFromFd int, assumedShouldPassToParent bool, screen *terminalscreen.Screen) {
out.partsMutex.Lock()
defer out.partsMutex.Unlock()

if out.shouldPassToParent {
if out.shouldPassToParent.value != assumedShouldPassToParent && screen != nil {
// We know for sure (due to using the mutex) this is now a foreground process that should pass its data
// directly to stdout/stderr - but our calling function doesn't know that yet - oops.
//
// The chunk storage for this process no longer exists, so can't do out.appendChunk()
//
// This wouldn't be a problem (just write to stdout/stderr!) if not for the virtual terminal screen emulation
// present in interactive mode. Writing child's output to stdout/stderr before the screen dumps all its contents
// is bound to produce out-of-order output.
//
// Let's just pass it along to the virtual screen for later processing then.
//
// Could probably get rid of this with some heavy refactoring, to make this function less bloated.
// (if only golang supported using sync.RWMutexes in sync.Cond, this whole situation wouldn't have happened)
screen.Advance(buf)
return
}

if out.shouldPassToParent.value {
_, err := standardFdToFile[dataFromFd].Write(buf)
if err != nil {
log.Fatalf("Syscall write to fd %d: %v\n", dataFromFd, err)
Expand All @@ -101,36 +131,109 @@ func waitIfUsingTooMuchMemory(willSaveBytes int64, out *Output) {
}
}

func readContinuouslyTo(stream io.ReadCloser, out *Output, fileDescriptor int) {
buffer := make([]byte, MAXBUF)
func isEOFFromPtmxDevice(err error) bool {
if err == nil {
return false
}

for {
count, err := stream.Read(buffer)
if err == io.EOF {
return true
}

var pathError *os.PathError
if errors.As(err, &pathError) && pathError.Err == syscall.EIO {
// Returning EIO is Linux's way of saying the other end is closed when reading from a ptmx:
// https://github.com/creack/pty/issues/21
// On BSDs (and macOS) this is signaled by a simple EOF
return true
}

return false
}

if count > 0 {
waitIfUsingTooMuchMemory(chunkSizeWithHeader(buffer[:count]), out)
out.appendOrWrite(buffer[:count], fileDescriptor)
func readContinuouslyToChannel(stream io.Reader, readResult chan withError[[]byte]) {
// Make two buffers, to operate on one and simultaneously read into the other one, without introducing
// race conditions
buffer1 := make([]byte, MAXBUF)
buffer2 := make([]byte, MAXBUF)

for {
count, err := stream.Read(buffer1)
if err != nil {
readResult <- withError[[]byte]{err: err}
return
}
readResult <- withError[[]byte]{value: buffer1[:count]}

count, err = stream.Read(buffer2)
if err != nil {
if err == io.EOF {
haveToClose("child stdout/stderr after EOF", stream)
break
}
if errors.Is(err, fs.ErrClosed) {
break
readResult <- withError[[]byte]{err: err}
return
}
readResult <- withError[[]byte]{value: buffer2[:count]}
}
}

func readContinuouslyTo(stream io.ReadCloser, throughVirtualScreen *terminalscreen.Screen, out *Output, fileDescriptor int) {
// Track out.shouldPassToParent ourselves (and update via a channel) to avoid race conditions
//shouldPassToParent := false // TODO: set to out.shouldPassToParent.value
shouldPassToParent := out.shouldPassToParent.value

readResult := make(chan withError[[]byte])
go readContinuouslyToChannel(stream, readResult)

for {
var buffer []byte
var err error

select {
case countWithError := <-readResult:
buffer = countWithError.value
err = countWithError.err
case <-out.shouldPassToParent.becameTrue:
shouldPassToParent = true
if throughVirtualScreen != nil {
// We became the foreground process ourselves - dump the last visible screen lines (non-scrollback)
// to the output.
throughVirtualScreen.End()
}
var pathError *os.PathError
if errors.As(err, &pathError) && pathError.Err == syscall.EIO {
// Returning EIO is Linux's way of saying the other end is closed when reading from a ptmx:
// https://github.com/creack/pty/issues/21
haveToClose("child stdout/stderr after EIO", stream)
break
}

if len(buffer) > 0 {
if throughVirtualScreen == nil || shouldPassToParent {
waitIfUsingTooMuchMemory(chunkSizeWithHeader(buffer), out)
out.appendOrWrite(buffer, fileDescriptor, shouldPassToParent, throughVirtualScreen)
} else {
throughVirtualScreen.Advance(buffer)
}
}

if throughVirtualScreen != nil && len(throughVirtualScreen.QueuedScrollbackOutput) > 0 {
waitIfUsingTooMuchMemory(chunkSizeWithHeader(throughVirtualScreen.QueuedScrollbackOutput), out)
out.appendOrWrite(throughVirtualScreen.QueuedScrollbackOutput, fileDescriptor, shouldPassToParent, throughVirtualScreen)
throughVirtualScreen.QueuedScrollbackOutput = []byte{}
}

if errors.Is(err, fs.ErrClosed) {
break
} else if isEOFFromPtmxDevice(err) {
haveToClose("child stdout/stderr after EIO", stream)
break
} else if err != nil {
log.Fatalf("error from read: %v\n", err)
}
}

// The process died (or at least closed its output) so need to dump the last visible screen lines into the output
if throughVirtualScreen != nil {
if !throughVirtualScreen.Ended {
throughVirtualScreen.End()
}
waitIfUsingTooMuchMemory(chunkSizeWithHeader(throughVirtualScreen.QueuedScrollbackOutput), out)
out.appendOrWrite(throughVirtualScreen.QueuedScrollbackOutput, fileDescriptor, shouldPassToParent, nil)
throughVirtualScreen.QueuedScrollbackOutput = []byte{}
}

out.streamClosed <- struct{}{}
}

Expand Down Expand Up @@ -166,7 +269,7 @@ func createPty(winSize *ptyPkg.Winsize) (pty, tty *os.File, err error) {
}

// the pty package opens /dev/ptmx without O_NONBLOCK. This makes go spawn a lot of threads
// when reading from lots of ptys in goroutines. Let's work around that by duping the file desctiptor
// when reading from lots of ptys in goroutines. Let's work around that by duping the file descriptor
// into a new one, and creating a new *os.File object with a new async fd
asyncPtyFd, err := unix.FcntlInt(pty.Fd(), unix.F_DUPFD_CLOEXEC, 0)
if err != nil {
Expand All @@ -188,20 +291,29 @@ func createPty(winSize *ptyPkg.Winsize) (pty, tty *os.File, err error) {
}

func runInteractive(cmd *exec.Cmd) *Output {
// set GOMAXPROCS to 1 to make the process running executeAndFlushTty a bit lighter - it's a really lightweight
// job, so it shouldn't consume much resources at all
cmd.Env = os.Environ()
if originalGoMaxProcs, exists := os.LookupEnv("GOMAXPROCS"); exists {
cmd.Env = append(cmd.Env, fmt.Sprintf("_GPARALLEL_ORIGINAL_GOMAXPROCS=%s", originalGoMaxProcs))
}
cmd.Env = append(cmd.Env, "GOMAXPROCS=1")

out := &Output{}
out := NewOutput()
var stdoutTty, stderrTty *os.File

size, err := ptyPkg.GetsizeFull(os.Stdout)
if err != nil {
log.Fatalf("Could not get terminal size: %v\n", err)
}

out.stdoutVirtualScreen = terminalscreen.NewScreen(int(size.Cols), int(size.Rows))
if stdoutAndStderrAreTheSame() {
out.stderrVirtualScreen = out.stdoutVirtualScreen
} else {
out.stderrVirtualScreen = terminalscreen.NewScreen(int(size.Cols), int(size.Rows))
}

out.stdoutPipeOrPty, stdoutTty, err = createPty(size)
if err != nil {
log.Fatalf("Couldn't create a pty for %v's stdout: %v\n", cmd.Args, err)
Expand All @@ -228,18 +340,24 @@ func runInteractive(cmd *exec.Cmd) *Output {
signal.Notify(out.winchSignal, syscall.SIGWINCH)
go func() {
for range out.winchSignal {
// TODO: this should handle just one of stderr/stdout being closed
// TODO: this should handle just one of stderr/stdout being closed and the other resizing

size, err := ptyPkg.GetsizeFull(os.Stdout)
if err != nil {
log.Fatalf("Could not get terminal size on sigwinch: %v\n", err)
}

// Resize the in-kernel virtual pty (to propagate the resize)
_ = ptyPkg.Setsize(out.stdoutPipeOrPty, size)

if !stdoutAndStderrAreTheSame() {
_ = ptyPkg.Setsize(out.stderrPipeOrPty, size)
}

// Resize our own in-process terminal screen representation
out.stdoutVirtualScreen.Resize(int(size.Rows), int(size.Cols))
if !stdoutAndStderrAreTheSame() {
out.stderrVirtualScreen.Resize(int(size.Rows), int(size.Cols))
}
}
}()

Expand All @@ -251,7 +369,7 @@ func runInteractive(cmd *exec.Cmd) *Output {

err = cmd.Start()
if err != nil {
// TODO: take the :2 only if --_execute-and-flush-tty is used - if not using it is even implemented
// TODO: take the :2 in the error message only if --_execute-and-flush-tty is used - if not using it is even implemented
log.Fatalf("Could not start %v: %v\n", shellescape.QuoteCommand(cmd.Args[2:]), err)
}

Expand All @@ -261,8 +379,8 @@ func runInteractive(cmd *exec.Cmd) *Output {
func runNonInteractive(cmd *exec.Cmd) *Output {
var err error
var stdoutWritePipe, stderrWritePipe *os.File
out := &Output{}

out := NewOutput()
out.stdoutPipeOrPty, stdoutWritePipe, err = os.Pipe()
if err != nil {
log.Fatalf("Could not create a pipe for %v's stdout: %v\n", cmd.Args, err)
Expand Down Expand Up @@ -327,9 +445,9 @@ func runWithStdin(command []string, stdin io.Reader) (result *ProcessResult) {
}

result.output.streamClosed = make(chan struct{}, 2)
go readContinuouslyTo(result.output.stdoutPipeOrPty, result.output, syscall.Stdout)
go readContinuouslyTo(result.output.stdoutPipeOrPty, result.output.stdoutVirtualScreen, result.output, syscall.Stdout)
if !stdoutAndStderrAreTheSame() {
go readContinuouslyTo(result.output.stderrPipeOrPty, result.output, syscall.Stderr)
go readContinuouslyTo(result.output.stderrPipeOrPty, result.output.stderrVirtualScreen, result.output, syscall.Stderr)
}

result.startedAt = time.Now()
Expand Down
Loading