diff --git a/daemon/containerio/mem_buffer.go b/daemon/containerio/mem_buffer.go deleted file mode 100644 index aaa4bf99df..0000000000 --- a/daemon/containerio/mem_buffer.go +++ /dev/null @@ -1,42 +0,0 @@ -package containerio - -import ( - "bytes" - "io" -) - -func init() { - Register(func() Backend { - return &memBuffer{} - }) -} - -type memBuffer struct { - buffer *bytes.Buffer -} - -func (b *memBuffer) Name() string { - return "memBuffer" -} - -func (b *memBuffer) Init(opt *Option) error { - b.buffer = opt.memBuffer - return nil -} - -func (b *memBuffer) Out() io.Writer { - return b.buffer -} - -func (b *memBuffer) In() io.Reader { - return b.buffer -} - -func (b *memBuffer) Err() io.Writer { - return b.buffer -} - -func (b *memBuffer) Close() error { - // Don't need to close bytes.Buffer. - return nil -} diff --git a/daemon/containerio/options.go b/daemon/containerio/options.go index 2b3ca8e885..7da61cc7da 100644 --- a/daemon/containerio/options.go +++ b/daemon/containerio/options.go @@ -1,7 +1,7 @@ package containerio import ( - "bytes" + "io" "net/http" "os" @@ -18,7 +18,7 @@ type Option struct { hijack http.Hijacker hijackUpgrade bool stdinBackend string - memBuffer *bytes.Buffer + pipe *io.PipeWriter streams *remotecommand.Streams criLogFile *os.File } @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) { } } -// WithMemBuffer specified the memory buffer backend. -func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) { +// WithPipe specified the pipe backend. +func WithPipe(pipe *io.PipeWriter) func(*Option) { return func(opt *Option) { if opt.backends == nil { opt.backends = make(map[string]struct{}) } - opt.backends["memBuffer"] = struct{}{} - opt.memBuffer = memBuffer + opt.backends["pipe"] = struct{}{} + opt.pipe = pipe } } diff --git a/daemon/containerio/pipe.go b/daemon/containerio/pipe.go new file mode 100644 index 0000000000..bca949ac6c --- /dev/null +++ b/daemon/containerio/pipe.go @@ -0,0 +1,41 @@ +package containerio + +import ( + "io" +) + +func init() { + Register(func() Backend { + return &pipe{} + }) +} + +type pipe struct { + pipeWriter *io.PipeWriter +} + +func (p *pipe) Name() string { + return "pipe" +} + +func (p *pipe) Init(opt *Option) error { + p.pipeWriter = opt.pipe + return nil +} + +func (p *pipe) Out() io.Writer { + return p.pipeWriter +} + +func (p *pipe) In() io.Reader { + return nil +} + +func (p *pipe) Err() io.Writer { + return p.pipeWriter +} + +func (p *pipe) Close() error { + return p.pipeWriter.Close() + +} diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 71bea0b7c6..dbb853186e 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -1531,9 +1531,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) { if attach.Stdin { options = append(options, containerio.WithStdinHijack()) } - } else if attach.MemBuffer != nil { - // Attaching using memory buffer. - options = append(options, containerio.WithMemBuffer(attach.MemBuffer)) + } else if attach.Pipe != nil { + // Attaching using pipe. + options = append(options, containerio.WithPipe(attach.Pipe)) } else if attach.Streams != nil { // Attaching using streams. options = append(options, containerio.WithStreams(attach.Streams)) diff --git a/daemon/mgr/container_types.go b/daemon/mgr/container_types.go index 135f35f68d..2de0c6335e 100644 --- a/daemon/mgr/container_types.go +++ b/daemon/mgr/container_types.go @@ -1,8 +1,8 @@ package mgr import ( - "bytes" "fmt" + "io" "net/http" "os" "sync" @@ -72,7 +72,7 @@ type AttachConfig struct { Upgrade bool // Attach using memory buffer. - MemBuffer *bytes.Buffer + Pipe *io.PipeWriter // Attach using streams. Streams *remotecommand.Streams diff --git a/daemon/mgr/cri.go b/daemon/mgr/cri.go index 5ccb98d367..97c909d606 100644 --- a/daemon/mgr/cri.go +++ b/daemon/mgr/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -20,7 +21,7 @@ import ( "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/version" - // NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+. + "github.com/containerd/containerd" "github.com/cri-o/ocicni/pkg/ocicni" "github.com/sirupsen/logrus" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -695,55 +696,73 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + timeout := time.Duration(r.GetTimeout()) * time.Second + var cancel context.CancelFunc + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, timeout) + } + defer cancel() + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} + reader, writer := io.Pipe() + defer writer.Close() + attachConfig := &AttachConfig{ Stdout: true, Stderr: true, - MemBuffer: &output, + Pipe: writer, MuxDisabled: true, } + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + readWaitCh := make(chan error, 1) + var recv bytes.Buffer + go func() { + defer reader.Close() + _, err = io.Copy(&recv, reader) + if err != nil { + logrus.Infof("failed to read data from the pipe") + } + readWaitCh <- err + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout %v exceeded", timeout) + case <-readWaitCh: + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.