diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 213ebb73665a..19f7fbd812d6 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -92,7 +92,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex root := opt.Root - if err := os.MkdirAll(root, 0711); err != nil { + if err := os.MkdirAll(root, 0o711); err != nil { return nil, errors.Wrapf(err, "failed to create %s", root) } @@ -205,7 +205,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } bundle := filepath.Join(w.root, id) - if err := os.Mkdir(bundle, 0711); err != nil { + if err := os.Mkdir(bundle, 0o711); err != nil { return err } defer os.RemoveAll(bundle) @@ -216,7 +216,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } rootFSPath := filepath.Join(bundle, "rootfs") - if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil { + if err := idtools.MkdirAllAndChown(rootFSPath, 0o700, identity); err != nil { return err } if err := mount.All(rootMount, rootFSPath); err != nil { @@ -270,7 +270,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { - if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { + if err := idtools.MkdirAllAndChown(newp, 0o755, identity); err != nil { return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -287,42 +287,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, return err } - // runCtx/killCtx is used for extra check in case the kill command blocks - runCtx, cancelRun := context.WithCancel(context.Background()) - defer cancelRun() - - ended := make(chan struct{}) - go func() { - for { - select { - case <-ctx.Done(): - killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second) - if err := w.runc.Kill(killCtx, id, int(syscall.SIGKILL), nil); err != nil { - bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err) - select { - case <-killCtx.Done(): - timeout() - cancelRun() - return - default: - } - } - timeout() - select { - case <-time.After(50 * time.Millisecond): - case <-ended: - return - } - case <-ended: - return - } - } - }() - bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args) trace.SpanFromContext(ctx).AddEvent("Container created") - err = w.run(runCtx, id, bundle, process, func() { + err = w.run(ctx, id, bundle, process, func() { startedOnce.Do(func() { trace.SpanFromContext(ctx).AddEvent("Container started") if started != nil { @@ -330,7 +298,6 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } }) }) - close(ended) return exitError(ctx, err) } @@ -462,23 +429,87 @@ func (s *forwardIO) Stderr() io.ReadCloser { return nil } -// startingProcess is to track the os process so we can send signals to it. -type startingProcess struct { - Process *os.Process - ready chan struct{} +// procHandle is to track the os process so we can send signals to it. +type procHandle struct { + Process *os.Process + ready chan struct{} + ended chan struct{} + shutdown func() } -// Release will free resources with a startingProcess. -func (p *startingProcess) Release() { +// runcProcessHandle will create a procHandle that will be monitored, where +// on ctx.Done the process will be killed. If the kill fails, then the cancel +// will be called. This is to allow for runc to go through its normal shutdown +// procedure if the ctx is canceled and to ensure there are no zombie processes +// left by runc. +func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Context) { + runcCtx, cancel := context.WithCancel(context.Background()) + p := &procHandle{ + ready: make(chan struct{}), + ended: make(chan struct{}), + shutdown: cancel, + } + // preserve the logger on the context used for the runc process handling + runcCtx = bklog.WithLogger(runcCtx, bklog.G(ctx)) + + go func() { + // Wait for pid + select { + case <-ctx.Done(): + return // nothing to kill + case <-p.ready: + } + + for { + select { + case <-ctx.Done(): + killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second) + if err := p.Process.Kill(); err != nil { + bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err) + select { + case <-killCtx.Done(): + timeout() + cancel() + return + default: + } + } + timeout() + select { + case <-time.After(50 * time.Millisecond): + case <-p.ended: + return + } + case <-p.ended: + return + } + } + }() + + return p, runcCtx +} + +// Release will free resources with a procHandle. +func (p *procHandle) Release() { + close(p.ended) if p.Process != nil { p.Process.Release() } } +// Shutdown should be called after the runc process has exited. This will allow +// the signal handling and tty resize loops to exit, terminating the +// goroutines. +func (p *procHandle) Shutdown() { + if p.shutdown != nil { + p.shutdown() + } +} + // WaitForReady will wait until the Process has been populated or the // provided context was cancelled. This should be called before using // the Process field. -func (p *startingProcess) WaitForReady(ctx context.Context) error { +func (p *procHandle) WaitForReady(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -490,7 +521,7 @@ func (p *startingProcess) WaitForReady(ctx context.Context) error { // WaitForStart will record the pid reported by Runc via the channel. // We wait for up to 10s for the runc process to start. If the started // callback is non-nil it will be called after receiving the pid. -func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error { +func (p *procHandle) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error { startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second) defer timeout() var err error @@ -515,7 +546,7 @@ func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int // handleSignals will wait until the runcProcess is ready then will // send each signal received on the channel to the process. -func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error { +func handleSignals(ctx context.Context, runcProcess *procHandle, signals <-chan syscall.Signal) error { if signals == nil { return nil } diff --git a/executor/runcexecutor/executor_common.go b/executor/runcexecutor/executor_common.go index 447c4a96b958..44c696fff10b 100644 --- a/executor/runcexecutor/executor_common.go +++ b/executor/runcexecutor/executor_common.go @@ -49,23 +49,20 @@ type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error // is only supported for linux, so this really just handles signal propagation // to the started runc process. func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error { - runcProcess := &startingProcess{ - ready: make(chan struct{}), - } + runcProcess, ctx := runcProcessHandle(ctx, id) defer runcProcess.Release() - var eg errgroup.Group - egCtx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) defer eg.Wait() - defer cancel() + defer runcProcess.Shutdown() startedCh := make(chan int, 1) eg.Go(func() error { - return runcProcess.WaitForStart(egCtx, startedCh, started) + return runcProcess.WaitForStart(ctx, startedCh, started) }) eg.Go(func() error { - return handleSignals(egCtx, runcProcess, process.Signal) + return handleSignals(ctx, runcProcess, process.Signal) }) return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}) diff --git a/executor/runcexecutor/executor_linux.go b/executor/runcexecutor/executor_linux.go index 15ea812a5a23..dbf73069274e 100644 --- a/executor/runcexecutor/executor_linux.go +++ b/executor/runcexecutor/executor_linux.go @@ -44,23 +44,20 @@ func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error { - runcProcess := &startingProcess{ - ready: make(chan struct{}), - } + runcProcess, ctx := runcProcessHandle(ctx, id) defer runcProcess.Release() - var eg errgroup.Group - egCtx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) defer eg.Wait() - defer cancel() + defer runcProcess.Shutdown() startedCh := make(chan int, 1) eg.Go(func() error { - return runcProcess.WaitForStart(egCtx, startedCh, started) + return runcProcess.WaitForStart(ctx, startedCh, started) }) eg.Go(func() error { - return handleSignals(egCtx, runcProcess, process.Signal) + return handleSignals(ctx, runcProcess, process.Signal) }) if !process.Meta.Tty { @@ -84,7 +81,7 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces } pts.Close() ptm.Close() - cancel() // this will shutdown resize and signal loops + runcProcess.Shutdown() err := eg.Wait() if err != nil { bklog.G(ctx).Warningf("error while shutting down tty io: %s", err) @@ -119,13 +116,13 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces } eg.Go(func() error { - err := runcProcess.WaitForReady(egCtx) + err := runcProcess.WaitForReady(ctx) if err != nil { return err } for { select { - case <-egCtx.Done(): + case <-ctx.Done(): return nil case resize := <-process.Resize: err = ptm.Resize(console.WinSize{