Skip to content

Commit

Permalink
Merge pull request #2590 from coryb/gateway-exec-signal
Browse files Browse the repository at this point in the history
Allow signals to be sent to gateway exec containers
  • Loading branch information
coryb authored Feb 3, 2022
2 parents 2ea7bed + 559d079 commit a640b47
Show file tree
Hide file tree
Showing 13 changed files with 784 additions and 200 deletions.
100 changes: 100 additions & 0 deletions client/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -52,6 +53,7 @@ func TestClientGatewayIntegration(t *testing.T) {
testClientGatewaySlowCacheExecError,
testClientGatewayExecFileActionError,
testClientGatewayContainerExtraHosts,
testClientGatewayContainerSignal,
testWarnings,
), integration.WithMirroredImages(integration.OfficialImages("busybox:latest")))

Expand Down Expand Up @@ -1894,6 +1896,104 @@ func testClientGatewayContainerHostNetworking(t *testing.T, sb integration.Sandb
checkAllReleasable(t, c, sb, true)
}

// testClientGatewayContainerSignal is testing that we can send a signal
func testClientGatewayContainerSignal(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
ctx := sb.Context()

c, err := New(ctx, sb.Address())
require.NoError(t, err)
defer c.Close()

product := "buildkit_test"

b := func(ctx context.Context, c client.Client) (*client.Result, error) {
ctx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()

st := llb.Image("busybox:latest")

def, err := st.Marshal(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal state")
}

r, err := c.Solve(ctx, client.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to solve")
}

ctr1, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
defer ctr1.Release(ctx)

pid1, err := ctr1.Start(ctx, client.StartRequest{
Args: []string{"sh", "-c", `trap 'kill $(jobs -p); exit 99' HUP; sleep 10 & wait`},
})
require.NoError(t, err)

// allow for the shell script to setup the trap before we signal it
time.Sleep(time.Second)

err = pid1.Signal(ctx, syscall.SIGHUP)
require.NoError(t, err)

err = pid1.Wait()
var exitError *gatewayapi.ExitError
require.ErrorAs(t, err, &exitError)
require.Equal(t, uint32(99), exitError.ExitCode)

// Now try again to signal an exec process

ctr2, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
defer ctr2.Release(ctx)

pid1, err = ctr2.Start(ctx, client.StartRequest{
Args: []string{"sleep", "10"},
})
require.NoError(t, err)

pid2, err := ctr2.Start(ctx, client.StartRequest{
Args: []string{"sh", "-c", `trap 'kill $(jobs -p); exit 111' INT; sleep 10 & wait`},
})
require.NoError(t, err)

// allow for the shell script to setup the trap before we signal it
time.Sleep(time.Second)

err = pid2.Signal(ctx, syscall.SIGINT)
require.NoError(t, err)

err = pid2.Wait()
require.ErrorAs(t, err, &exitError)
require.Equal(t, uint32(111), exitError.ExitCode)

pid1.Signal(ctx, syscall.SIGKILL)
pid1.Wait()
return &client.Result{}, err
}

_, err = c.Build(ctx, SolveOpt{}, product, b, nil)
require.Error(t, err)

checkAllReleasable(t, c, sb, true)
}

type nopCloser struct {
io.Writer
}
Expand Down
36 changes: 26 additions & 10 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
}
}()

err = w.runProcess(ctx, task, process.Resize, func() {
err = w.runProcess(ctx, task, process.Resize, process.Signal, func() {
startedOnce.Do(func() {
if started != nil {
close(started)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
return errors.WithStack(err)
}

err = w.runProcess(ctx, taskProcess, process.Resize, nil)
err = w.runProcess(ctx, taskProcess, process.Resize, process.Signal, nil)
return err
}

Expand All @@ -310,7 +310,7 @@ func fixProcessOutput(process *executor.ProcessInfo) {
}
}

func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error {
func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, signal <-chan syscall.Signal, started func()) error {
// Not using `ctx` here because the context passed only affects the statusCh which we
// don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel.
statusCh, err := p.Wait(context.Background())
Expand All @@ -335,22 +335,38 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces

p.CloseIO(ctx, containerd.WithStdinCloser)

// resize in separate go loop so it does not potentially block
// the container cancel/exit status loop below.
resizeCtx, resizeCancel := context.WithCancel(ctx)
defer resizeCancel()
// handle signals (and resize) in separate go loop so it does not
// potentially block the container cancel/exit status loop below.
eventCtx, eventCancel := context.WithCancel(ctx)
defer eventCancel()
go func() {
for {
select {
case <-resizeCtx.Done():
case <-eventCtx.Done():
return
case size, ok := <-resize:
if !ok {
return // chan closed
}
err = p.Resize(resizeCtx, size.Cols, size.Rows)
err = p.Resize(eventCtx, size.Cols, size.Rows)
if err != nil {
bklog.G(resizeCtx).Warnf("Failed to resize %s: %s", p.ID(), err)
bklog.G(eventCtx).Warnf("Failed to resize %s: %s", p.ID(), err)
}
}
}
}()
go func() {
for {
select {
case <-eventCtx.Done():
return
case sig, ok := <-signal:
if !ok {
return // chan closed
}
err = p.Kill(eventCtx, sig)
if err != nil {
bklog.G(eventCtx).Warnf("Failed to signal %s: %s", p.ID(), err)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"net"
"syscall"

"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
Expand Down Expand Up @@ -45,6 +46,7 @@ type ProcessInfo struct {
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
Resize <-chan WinSize
Signal <-chan syscall.Signal
}

type Executor interface {
Expand Down
89 changes: 82 additions & 7 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}()

bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args)
// this is a cheat, we have not actually started, but as close as we can get with runc for now
if started != nil {

err = w.run(runCtx, id, bundle, process, func() {
startedOnce.Do(func() {
close(started)
if started != nil {
close(started)
}
})
}

err = w.run(runCtx, id, bundle, process)
})
close(ended)
return exitError(ctx, err)
}
Expand Down Expand Up @@ -414,7 +414,7 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
spec.Process.Env = process.Meta.Env
}

err = w.exec(ctx, id, state.Bundle, spec.Process, process)
err = w.exec(ctx, id, state.Bundle, spec.Process, process, nil)
return exitError(ctx, err)
}

Expand Down Expand Up @@ -444,3 +444,78 @@ func (s *forwardIO) Stdout() io.ReadCloser {
func (s *forwardIO) Stderr() io.ReadCloser {
return nil
}

// startingProcess is to track the os process so we can send signals to it.
type startingProcess struct {
Process *os.Process
ready chan struct{}
}

// Release will free resources with a startingProcess.
func (p *startingProcess) Release() {
if p.Process != nil {
p.Process.Release()
}
}

// WaitForReady will wait until the Process has been populated or the
// provided context was cancelled. This should be called before using
// the Process field.
func (p *startingProcess) WaitForReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.ready:
return nil
}
}

// WaitForStart will record the pid reported by Runc via the channel.
// We wait for up to 10s for the runc process to start. If the started
// callback is non-nil it will be called after receiving the pid.
func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
var err error
select {
case <-startedCtx.Done():
return errors.New("runc started message never received")
case pid, ok := <-startedCh:
if !ok {
return errors.New("runc process failed to send pid")
}
if started != nil {
started()
}
p.Process, err = os.FindProcess(pid)
if err != nil {
return errors.Wrapf(err, "unable to find runc process for pid %d", pid)
}
close(p.ready)
}
return nil
}

// handleSignals will wait until the runcProcess is ready then will
// send each signal received on the channel to the process.
func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error {
if signals == nil {
return nil
}
err := runcProcess.WaitForReady(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case sig := <-signals:
err := runcProcess.Process.Signal(sig)
if err != nil {
bklog.G(ctx).Errorf("failed to signal %s to process: %s", sig, err)
return err
}
}
}
}
51 changes: 43 additions & 8 deletions executor/runcexecutor/executor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,63 @@ import (
"github.com/moby/buildkit/executor"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

var unsupportedConsoleError = errors.New("tty for runc is only supported on linux")

func updateRuncFieldsForHostOS(runtime *runc.Runc) {}

func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) error {
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error {
if process.Meta.Tty {
return unsupportedConsoleError
}
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
NoPivot: w.noPivot,
Started: started,
IO: io,
})
return err
})
return err
}

func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) error {
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo, started func()) error {
if process.Meta.Tty {
return unsupportedConsoleError
}
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
Started: started,
IO: io,
})
})
}

type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error

// commonCall is the common run/exec logic used for non-linux runtimes. A tty
// is only supported for linux, so this really just handles signal propagation
// to the started runc process.
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess := &startingProcess{
ready: make(chan struct{}),
}
defer runcProcess.Release()

var eg errgroup.Group
egCtx, cancel := context.WithCancel(ctx)
defer eg.Wait()
defer cancel()

startedCh := make(chan int, 1)
eg.Go(func() error {
return runcProcess.WaitForStart(egCtx, startedCh, started)
})

eg.Go(func() error {
return handleSignals(egCtx, runcProcess, process.Signal)
})

return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
}
Loading

0 comments on commit a640b47

Please sign in to comment.