diff --git a/client/build_test.go b/client/build_test.go index 22c9275be78e..93edb22f997e 100644 --- a/client/build_test.go +++ b/client/build_test.go @@ -3,10 +3,12 @@ package client import ( "bytes" "context" + "fmt" "io" "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" @@ -37,6 +39,8 @@ func TestClientGatewayIntegration(t *testing.T) { testClientGatewayContainerPID1Fail, testClientGatewayContainerPID1Exit, testClientGatewayContainerMounts, + testClientGatewayContainerPID1Tty, + testClientGatewayContainerExecTty, }, integration.WithMirroredImages(integration.OfficialImages("busybox:latest"))) } @@ -718,6 +722,234 @@ func testClientGatewayContainerMounts(t *testing.T, sb integration.Sandbox) { checkAllReleasable(t, c, sb, true) } +// testClientGatewayContainerPID1Tty is testing that we can get a tty via +// a container pid1, executor.Run +func testClientGatewayContainerPID1Tty(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + product := "buildkit_test" + + inputR, inputW := io.Pipe() + output := bytes.NewBuffer(nil) + + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + ctx, timeout := context.WithTimeout(ctx, 10*time.Second) + defer timeout() + + st := llb.Image("busybox:latest") + + def, err := st.Marshal(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal state") + } + + r, err := c.Solve(ctx, client.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + + ctr, err := c.NewContainer(ctx, client.NewContainerRequest{ + Mounts: []client.Mount{{ + Dest: "/", + MountType: pb.MountType_BIND, + Ref: r.Ref, + }}, + }) + require.NoError(t, err) + defer ctr.Release(ctx) + + prompt := newTestPrompt(ctx, t, inputW, output) + pid1, err := ctr.Start(ctx, client.StartRequest{ + Args: []string{"sh"}, + Tty: true, + Stdin: inputR, + Stdout: &nopCloser{output}, + Stderr: &nopCloser{output}, + Env: []string{fmt.Sprintf("PS1=%s", prompt.String())}, + }) + require.NoError(t, err) + err = pid1.Resize(ctx, client.WinSize{Rows: 40, Cols: 80}) + require.NoError(t, err) + prompt.SendExpect("ttysize", "80 40") + prompt.Send("cd /tmp") + prompt.SendExpect("pwd", "/tmp") + prompt.Send("echo foobar > newfile") + prompt.SendExpect("cat /tmp/newfile", "foobar") + err = pid1.Resize(ctx, client.WinSize{Rows: 60, Cols: 100}) + require.NoError(t, err) + prompt.SendExpect("ttysize", "100 60") + prompt.SendExit(99) + + err = pid1.Wait() + var exitError *errdefs.ExitError + require.True(t, errors.As(err, &exitError)) + require.Equal(t, uint32(99), exitError.ExitCode) + + return &client.Result{}, err + } + + _, err = c.Build(ctx, SolveOpt{}, product, b, nil) + require.Error(t, err) + + inputW.Close() + inputR.Close() + + checkAllReleasable(t, c, sb, true) +} + +type testPrompt struct { + ctx context.Context + t *testing.T + output *bytes.Buffer + input io.Writer + prompt string + pos int +} + +func newTestPrompt(ctx context.Context, t *testing.T, input io.Writer, output *bytes.Buffer) *testPrompt { + return &testPrompt{ + ctx: ctx, + t: t, + input: input, + output: output, + prompt: "% ", + } +} + +func (p *testPrompt) String() string { return p.prompt } + +func (p *testPrompt) SendExit(status int) { + p.input.Write([]byte(fmt.Sprintf("exit %d\n", status))) +} + +func (p *testPrompt) Send(cmd string) { + p.input.Write([]byte(cmd + "\n")) + p.wait(p.prompt) +} + +func (p *testPrompt) SendExpect(cmd, expected string) { + for { + p.input.Write([]byte(cmd + "\n")) + response := p.wait(p.prompt) + if strings.Contains(response, expected) { + return + } + } +} + +func (p *testPrompt) wait(msg string) string { + for { + newOutput := p.output.String()[p.pos:] + if strings.Contains(newOutput, msg) { + p.pos += len(newOutput) + return newOutput + } + select { + case <-p.ctx.Done(): + p.t.Logf("Output at timeout: %s", p.output.String()) + p.t.Fatalf("Timeout waiting for %q", msg) + case <-time.After(100 * time.Millisecond): + } + } +} + +// testClientGatewayContainerExecTty is testing that we can get a tty via +// executor.Exec (secondary process) +func testClientGatewayContainerExecTty(t *testing.T, sb integration.Sandbox) { + if sb.Rootless() { + // TODO fix this + // We get `panic: cannot statfs cgroup root` when running this test + // with runc-rootless + t.Skip("Skipping runc-rootless for cgroup error") + } + requiresLinux(t) + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + product := "buildkit_test" + + inputR, inputW := io.Pipe() + output := bytes.NewBuffer(nil) + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + ctx, timeout := context.WithTimeout(ctx, 10*time.Second) + defer timeout() + st := llb.Image("busybox:latest") + + def, err := st.Marshal(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal state") + } + + r, err := c.Solve(ctx, client.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + + ctr, err := c.NewContainer(ctx, client.NewContainerRequest{ + Mounts: []client.Mount{{ + Dest: "/", + MountType: pb.MountType_BIND, + Ref: r.Ref, + }}, + }) + require.NoError(t, err) + + pid1, err := ctr.Start(ctx, client.StartRequest{ + Args: []string{"sleep", "10"}, + }) + require.NoError(t, err) + + defer pid1.Wait() + defer ctr.Release(ctx) + + prompt := newTestPrompt(ctx, t, inputW, output) + pid2, err := ctr.Start(ctx, client.StartRequest{ + Args: []string{"sh"}, + Tty: true, + Stdin: inputR, + Stdout: &nopCloser{output}, + Stderr: &nopCloser{output}, + Env: []string{fmt.Sprintf("PS1=%s", prompt.String())}, + }) + require.NoError(t, err) + + err = pid2.Resize(ctx, client.WinSize{Rows: 40, Cols: 80}) + require.NoError(t, err) + prompt.SendExpect("ttysize", "80 40") + prompt.Send("cd /tmp") + prompt.SendExpect("pwd", "/tmp") + prompt.Send("echo foobar > newfile") + prompt.SendExpect("cat /tmp/newfile", "foobar") + err = pid2.Resize(ctx, client.WinSize{Rows: 60, Cols: 100}) + require.NoError(t, err) + prompt.SendExpect("ttysize", "100 60") + prompt.SendExit(99) + + return &client.Result{}, pid2.Wait() + } + + _, err = c.Build(ctx, SolveOpt{}, product, b, nil) + require.Error(t, err) + require.Regexp(t, "exit code: 99|runc did not terminate successfully", err.Error()) + + inputW.Close() + inputR.Close() + + checkAllReleasable(t, c, sb, true) +} + type nopCloser struct { io.Writer } diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 9849070d7403..60c07f54f653 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -272,10 +272,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } } - if meta.Tty { - return errors.New("tty with runc not implemented") - } - + spec.Process.Terminal = meta.Tty spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { @@ -326,10 +323,8 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, close(started) }) } - status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{ - IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, - NoPivot: w.noPivot, - }) + + status, err := w.run(runCtx, id, bundle, process) close(ended) if status != 0 || err != nil { @@ -413,21 +408,14 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro spec.Process.Env = process.Meta.Env } - err = w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ - IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, - }) - - var exitError *exec.ExitError - if errors.As(err, &exitError) { - err = &errdefs.ExitError{ - ExitCode: uint32(exitError.ExitCode()), - Err: err, - } - return err - } else if err != nil { - return err + status, err := w.exec(ctx, id, state.Bundle, spec.Process, process) + if status == 0 && err == nil { + return nil + } + return &errdefs.ExitError{ + ExitCode: uint32(status), + Err: err, } - return nil } type forwardIO struct { diff --git a/executor/runcexecutor/executor_common.go b/executor/runcexecutor/executor_common.go new file mode 100644 index 000000000000..729db008b544 --- /dev/null +++ b/executor/runcexecutor/executor_common.go @@ -0,0 +1,44 @@ +// +build !linux + +package runcexecutor + +import ( + "context" + "os/exec" + + "github.com/containerd/containerd" + runc "github.com/containerd/go-runc" + "github.com/moby/buildkit/executor" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +var unsupportedConsoleError = errors.New("tty for runc is only supported on linux") + +func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) (int, error) { + if process.Meta.Tty { + return 0, unsupportedConsoleError + } + return w.runc.Run(ctx, id, bundle, &runc.CreateOpts{ + IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, + NoPivot: w.noPivot, + }) +} + +func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) (int, error) { + if process.Meta.Tty { + return 0, unsupportedConsoleError + } + err := w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{ + IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, + }) + + var exitError *exec.ExitError + if errors.As(err, &exitError) { + return exitError.ExitCode(), err + } + if err != nil { + return containerd.UnknownExitStatus, err + } + return 0, nil +} diff --git a/executor/runcexecutor/executor_linux.go b/executor/runcexecutor/executor_linux.go new file mode 100644 index 000000000000..0daa8d23c676 --- /dev/null +++ b/executor/runcexecutor/executor_linux.go @@ -0,0 +1,206 @@ +package runcexecutor + +import ( + "bufio" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "strconv" + "strings" + "syscall" + "time" + + "github.com/containerd/console" + "github.com/containerd/containerd" + runc "github.com/containerd/go-runc" + "github.com/docker/docker/pkg/signal" + "github.com/moby/buildkit/executor" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) (int, error) { + return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, pidfile string, io runc.IO) (int, error) { + return w.runc.Run(ctx, id, bundle, &runc.CreateOpts{ + NoPivot: w.noPivot, + PidFile: pidfile, + IO: io, + }) + }) +} + +func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) (int, error) { + return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, pidfile string, io runc.IO) (int, error) { + err := w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{ + PidFile: pidfile, + IO: io, + }) + var exitError *exec.ExitError + if errors.As(err, &exitError) { + return exitError.ExitCode(), err + } + if err != nil { + return containerd.UnknownExitStatus, err + } + return 0, nil + }) +} + +type runcCall func(ctx context.Context, pidfile string, io runc.IO) (int, error) + +func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, call runcCall) (int, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + pidfile, err := ioutil.TempFile(bundle, "*.pid") + if err != nil { + return containerd.UnknownExitStatus, errors.Wrap(err, "failed to create pidfile") + } + defer os.Remove(pidfile.Name()) + pidfile.Close() + + if !process.Meta.Tty { + return call(ctx, pidfile.Name(), &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}) + } + + ptm, ptsName, err := console.NewPty() + if err != nil { + return containerd.UnknownExitStatus, err + } + + pts, err := os.OpenFile(ptsName, os.O_RDWR|syscall.O_NOCTTY, 0) + if err != nil { + ptm.Close() + return containerd.UnknownExitStatus, err + } + + eg, ctx := errgroup.WithContext(ctx) + + defer func() { + if process.Stdin != nil { + process.Stdin.Close() + } + pts.Close() + ptm.Close() + cancel() // this will shutdown resize loop + err := eg.Wait() + if err != nil { + logrus.Warningf("error while shutting down tty io: %s", err) + } + }() + + if process.Stdin != nil { + eg.Go(func() error { + _, err := io.Copy(ptm, process.Stdin) + // stdin might be a pipe, so this is like EOF + if errors.Is(err, io.ErrClosedPipe) { + return nil + } + return err + }) + } + + if process.Stdout != nil { + eg.Go(func() error { + _, err := io.Copy(process.Stdout, ptm) + // ignore `read /dev/ptmx: input/output error` when ptm is closed + var ptmClosedError *os.PathError + if errors.As(err, &ptmClosedError) { + if ptmClosedError.Op == "read" && + ptmClosedError.Path == "/dev/ptmx" && + ptmClosedError.Err == syscall.EIO { + return nil + } + } + return err + }) + } + + eg.Go(func() error { + // need to poll until the pidfile has the pid written to it + pidfileCtx, timeout := context.WithTimeout(ctx, 10*time.Second) + defer timeout() + + var runcProcess *os.Process + for { + st, err := os.Stat(pidfile.Name()) + if err == nil && st.Size() > 0 { + pid, err := runc.ReadPidFile(pidfile.Name()) + if err != nil { + return errors.Wrapf(err, "unable to read pid file: %s", pidfile.Name()) + } + // pid will be for the process in process.Meta, not the parent runc process. + // We need to send SIGWINCH to the runc process, not the process.Meta process. + ppid, err := getppid(pid) + if err != nil { + return errors.Wrapf(err, "unable to find runc process (parent of %d)", pid) + } + runcProcess, err = os.FindProcess(ppid) + if err != nil { + return errors.Wrapf(err, "unable to find process for pid %d", ppid) + } + break + } + select { + case <-pidfileCtx.Done(): + return errors.New("pidfile never updated") + case <-time.After(100 * time.Microsecond): + } + } + for { + select { + case <-ctx.Done(): + return nil + case resize := <-process.Resize: + err = ptm.Resize(console.WinSize{ + Height: uint16(resize.Rows), + Width: uint16(resize.Cols), + }) + if err != nil { + logrus.Errorf("failed to resize ptm: %s", err) + } + err = runcProcess.Signal(signal.SIGWINCH) + if err != nil { + logrus.Errorf("failed to send SIGWINCH to process: %s", err) + } + } + } + }) + + runcIO := &forwardIO{} + if process.Stdin != nil { + runcIO.stdin = pts + } + if process.Stdout != nil { + runcIO.stdout = pts + } + if process.Stderr != nil { + runcIO.stderr = pts + } + + return call(ctx, pidfile.Name(), runcIO) +} + +const PPidStatusPrefix = "PPid:\t" + +func getppid(pid int) (int, error) { + fh, err := os.Open(fmt.Sprintf("/proc/%d/status", pid)) + if err != nil { + return -1, err + } + + defer fh.Close() + scanner := bufio.NewScanner(fh) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, PPidStatusPrefix) { + return strconv.Atoi(strings.TrimPrefix(line, PPidStatusPrefix)) + } + } + return -1, errors.Errorf("PPid line not found in /proc/%d/status", pid) +} diff --git a/frontend/gateway/grpcclient/client.go b/frontend/gateway/grpcclient/client.go index c60359615786..92e416cbaf5d 100644 --- a/frontend/gateway/grpcclient/client.go +++ b/frontend/gateway/grpcclient/client.go @@ -475,14 +475,15 @@ func (b *procMessageForwarder) Send(ctx context.Context, m *pb.ExecMessage) { } } -func (b *procMessageForwarder) Recv(ctx context.Context) *pb.ExecMessage { +func (b *procMessageForwarder) Recv(ctx context.Context) (m *pb.ExecMessage, ok bool) { select { case <-ctx.Done(): + return nil, true case <-b.done: - case m := <-b.msgs: - return m + return nil, false + case m = <-b.msgs: + return m, true } - return nil } func (b *procMessageForwarder) Close() { @@ -734,7 +735,7 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien return nil, err } - msg := msgs.Recv(ctx) + msg, _ := msgs.Recv(ctx) if msg == nil { return nil, errors.Errorf("failed to receive started message") } @@ -798,13 +799,24 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien } ctrProc.eg.Go(func() error { + var closeDoneOnce sync.Once var exitError error for { - msg := msgs.Recv(ctx) - if msg == nil { + msg, ok := msgs.Recv(ctx) + if !ok { + // no more messages, return return exitError } + if msg == nil { + // empty message from ctx cancel, so just start shutting down + // input, but continue processing more exit/done messages + closeDoneOnce.Do(func() { + close(done) + }) + continue + } + if file := msg.GetFile(); file != nil { var out io.WriteCloser switch file.Fd { @@ -826,7 +838,9 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien } else if exit := msg.GetExit(); exit != nil { // capture exit message to exitError so we can return it after // the server sends the Done message - close(done) + closeDoneOnce.Do(func() { + close(done) + }) if exit.Code == 0 { continue }