Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitor: Enable to exec into the container #1626

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 0 additions & 145 deletions build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,151 +660,6 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
return &so, releaseF, nil
}

// ContainerConfig is configuration for a container to run.
type ContainerConfig struct {
ResultCtx *ResultContext

Stdin io.ReadCloser
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool

Entrypoint []string
Cmd []string
Env []string
User *string
Cwd *string
}

// ResultContext is a build result with the client that built it.
type ResultContext struct {
Client *client.Client
Res *gateway.Result
}

// Invoke invokes a build result as a container.
func Invoke(ctx context.Context, cfg ContainerConfig) error {
if cfg.ResultCtx == nil {
return errors.Errorf("result must be provided")
}
c, res := cfg.ResultCtx.Client, cfg.ResultCtx.Res

mainCtx := ctx

_, err := c.Build(context.TODO(), client.SolveOpt{}, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-mainCtx.Done()
cancel()
}()

if res.Ref == nil {
return nil, errors.Errorf("no reference is registered")
}
st, err := res.Ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
imgRef, err := c.Solve(ctx, gateway.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, err
}
ctr, err := c.NewContainer(ctx, gateway.NewContainerRequest{
Mounts: []gateway.Mount{
{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: imgRef.Ref,
},
},
})
if err != nil {
return nil, err
}
defer ctr.Release(context.TODO())

imgData := res.Metadata[exptypes.ExporterImageConfigKey]
var img *specs.Image
if len(imgData) > 0 {
img = &specs.Image{}
if err := json.Unmarshal(imgData, img); err != nil {
fmt.Println(err)
return nil, err
}
}

user := ""
if cfg.User != nil {
user = *cfg.User
} else if img != nil {
user = img.Config.User
}

cwd := ""
if cfg.Cwd != nil {
cwd = *cfg.Cwd
} else if img != nil {
cwd = img.Config.WorkingDir
}

env := []string{}
if img != nil {
env = append(env, img.Config.Env...)
}
env = append(env, cfg.Env...)

args := []string{}
if cfg.Entrypoint != nil {
args = append(args, cfg.Entrypoint...)
} else if img != nil {
args = append(args, img.Config.Entrypoint...)
}
if cfg.Cmd != nil {
args = append(args, cfg.Cmd...)
} else if img != nil {
args = append(args, img.Config.Cmd...)
}

proc, err := ctr.Start(ctx, gateway.StartRequest{
Args: args,
Env: env,
User: user,
Cwd: cwd,
Tty: cfg.Tty,
Stdin: cfg.Stdin,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
})
if err != nil {
return nil, errors.Errorf("failed to start container: %v", err)
}
errCh := make(chan error)
doneCh := make(chan struct{})
go func() {
if err := proc.Wait(); err != nil {
errCh <- err
return
}
close(doneCh)
}()
select {
case <-doneCh:
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errCh:
return nil, err
}
return nil, nil
}, nil)
return err
}

func Build(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer) (resp map[string]*client.SolveResponse, err error) {
return BuildWithResultHandler(ctx, nodes, opt, docker, configDir, w, nil)
}
Expand Down
216 changes: 216 additions & 0 deletions build/invoke.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package build

import (
"context"
_ "crypto/sha256" // ensure digests can be computed
"encoding/json"
"fmt"
"io"
"sync"
"sync/atomic"
"syscall"

controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/pb"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// ResultContext is a build result with the client that built it.
type ResultContext struct {
Client *client.Client
Res *gateway.Result
}

type Container struct {
cancelOnce sync.Once
containerCancel func()

isUnavailable atomic.Bool

initStarted atomic.Bool

container gateway.Container
image *specs.Image

releaseCh chan struct{}
}

func NewContainer(ctx context.Context, resultCtx *ResultContext) (*Container, error) {
c, res := resultCtx.Client, resultCtx.Res

mainCtx := ctx

ctrCh := make(chan *Container)
errCh := make(chan error)
go func() {
_, err := c.Build(context.TODO(), client.SolveOpt{}, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-mainCtx.Done()
cancel()
}()

if res.Ref == nil {
return nil, errors.Errorf("no reference is registered")
}
st, err := res.Ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
imgRef, err := c.Solve(ctx, gateway.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, err
}
containerCtx, containerCancel := context.WithCancel(ctx)
defer containerCancel()
bkContainer, err := c.NewContainer(containerCtx, gateway.NewContainerRequest{
Mounts: []gateway.Mount{
{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: imgRef.Ref,
},
},
})
if err != nil {
return nil, err
}
imgData := res.Metadata[exptypes.ExporterImageConfigKey]
var img *specs.Image
if len(imgData) > 0 {
img = &specs.Image{}
if err := json.Unmarshal(imgData, img); err != nil {
fmt.Println(err)
return nil, err
}
}
releaseCh := make(chan struct{})
container := &Container{
containerCancel: containerCancel,
container: bkContainer,
image: img,
releaseCh: releaseCh,
}
ctrCh <- container
<-container.releaseCh

return nil, bkContainer.Release(ctx)
}, nil)
if err != nil {
errCh <- err
}
}()
select {
case ctr := <-ctrCh:
return ctr, nil
case err := <-errCh:
return nil, err
case <-mainCtx.Done():
return nil, mainCtx.Err()
}
}

func (c *Container) Cancel() {
c.markUnavailable()
c.cancelOnce.Do(func() {
if c.containerCancel != nil {
c.containerCancel()
}
close(c.releaseCh)
})
}

func (c *Container) IsUnavailable() bool {
return c.isUnavailable.Load()
}

func (c *Container) markUnavailable() {
c.isUnavailable.Store(true)
}

func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if isInit := c.initStarted.CompareAndSwap(false, true); isInit {
defer func() {
// container can't be used after init exits
c.markUnavailable()
}()
}
err := exec(ctx, cfg, c.container, c.image, stdin, stdout, stderr)
if err != nil {
// Container becomes unavailable if one of the processes fails in it.
c.markUnavailable()
}
return err
}

func exec(ctx context.Context, cfg *controllerapi.InvokeConfig, ctr gateway.Container, img *specs.Image, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
user := ""
if !cfg.NoUser {
user = cfg.User
} else if img != nil {
user = img.Config.User
}

cwd := ""
if !cfg.NoCwd {
cwd = cfg.Cwd
} else if img != nil {
cwd = img.Config.WorkingDir
}

env := []string{}
if img != nil {
env = append(env, img.Config.Env...)
}
env = append(env, cfg.Env...)

args := []string{}
if cfg.Entrypoint != nil {
args = append(args, cfg.Entrypoint...)
}
if cfg.Cmd != nil {
args = append(args, cfg.Cmd...)
}
// caller should always set args
if len(args) == 0 {
return errors.Errorf("specify args to execute")
}
proc, err := ctr.Start(ctx, gateway.StartRequest{
Args: args,
Env: env,
User: user,
Cwd: cwd,
Tty: cfg.Tty,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err != nil {
return errors.Errorf("failed to start container: %v", err)
}

doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
if err := proc.Signal(ctx, syscall.SIGKILL); err != nil {
logrus.Warnf("failed to kill process: %v", err)
}
case <-doneCh:
}
}()

return proc.Wait()
}
6 changes: 3 additions & 3 deletions commands/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,10 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
// stdin must be usable for monitor
return errors.Errorf("Dockerfile or context from stdin is not supported with invoke")
}
var invokeConfig controllerapi.ContainerConfig
var invokeConfig controllerapi.InvokeConfig
if inv := options.invoke; inv != "" {
var err error
invokeConfig, err = parseInvokeConfig(inv) // TODO: produce *controller.ContainerConfig directly.
invokeConfig, err = parseInvokeConfig(inv)
if err != nil {
return err
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
return nil
}

func parseInvokeConfig(invoke string) (cfg controllerapi.ContainerConfig, err error) {
func parseInvokeConfig(invoke string) (cfg controllerapi.InvokeConfig, err error) {
cfg.Tty = true
if invoke == "default" {
return cfg, nil
Expand Down
Loading