From 13536d080d6b5299b4d62d7b8d3e706098e3d3b7 Mon Sep 17 00:00:00 2001 From: Zou Rui <21751189@zju.edu.cn> Date: Tue, 15 May 2018 11:03:48 +0800 Subject: [PATCH] feature: add timeout handler for execSync in cri part Signed-off-by: Zou Rui <21751189@zju.edu.cn> --- cri/src/cri.go | 64 +++++++++++++++++++++----------- daemon/containerio/mem_buffer.go | 42 --------------------- daemon/containerio/options.go | 12 +++--- daemon/containerio/pipe.go | 40 ++++++++++++++++++++ daemon/mgr/container.go | 6 +-- daemon/mgr/container_types.go | 6 +-- 6 files changed, 94 insertions(+), 76 deletions(-) delete mode 100644 daemon/containerio/mem_buffer.go create mode 100644 daemon/containerio/pipe.go diff --git a/cri/src/cri.go b/cri/src/cri.go index ad692074e..5cb3fdf75 100644 --- a/cri/src/cri.go +++ b/cri/src/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -696,55 +697,74 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + timeout := time.Duration(r.GetTimeout()) * time.Second + var cancel context.CancelFunc + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, timeout) + } + defer cancel() + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} + reader, writer := io.Pipe() + defer writer.Close() + attachConfig := &mgr.AttachConfig{ Stdout: true, Stderr: true, - MemBuffer: &output, + Pipe: writer, MuxDisabled: true, } + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *mgr.ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + readWaitCh := make(chan error, 1) + var recv bytes.Buffer + go func() { + defer reader.Close() + _, err = io.Copy(&recv, reader) + readWaitCh <- err + }() + + select { + case <-ctx.Done(): + //TODO maybe stop the execution? + return nil, fmt.Errorf("timeout %v exceeded", timeout) + case readWaitErr := <-readWaitCh: + if readWaitErr != nil { + return nil, fmt.Errorf("failed to read data from the pipe: %v", err) + } + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. diff --git a/daemon/containerio/mem_buffer.go b/daemon/containerio/mem_buffer.go deleted file mode 100644 index aaa4bf99d..000000000 --- a/daemon/containerio/mem_buffer.go +++ /dev/null @@ -1,42 +0,0 @@ -package containerio - -import ( - "bytes" - "io" -) - -func init() { - Register(func() Backend { - return &memBuffer{} - }) -} - -type memBuffer struct { - buffer *bytes.Buffer -} - -func (b *memBuffer) Name() string { - return "memBuffer" -} - -func (b *memBuffer) Init(opt *Option) error { - b.buffer = opt.memBuffer - return nil -} - -func (b *memBuffer) Out() io.Writer { - return b.buffer -} - -func (b *memBuffer) In() io.Reader { - return b.buffer -} - -func (b *memBuffer) Err() io.Writer { - return b.buffer -} - -func (b *memBuffer) Close() error { - // Don't need to close bytes.Buffer. - return nil -} diff --git a/daemon/containerio/options.go b/daemon/containerio/options.go index ed3558f9b..58b085460 100644 --- a/daemon/containerio/options.go +++ b/daemon/containerio/options.go @@ -1,7 +1,7 @@ package containerio import ( - "bytes" + "io" "net/http" "os" @@ -18,7 +18,7 @@ type Option struct { hijack http.Hijacker hijackUpgrade bool stdinBackend string - memBuffer *bytes.Buffer + pipe *io.PipeWriter streams *remotecommand.Streams criLogFile *os.File } @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) { } } -// WithMemBuffer specified the memory buffer backend. -func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) { +// WithPipe specified the pipe backend. +func WithPipe(pipe *io.PipeWriter) func(*Option) { return func(opt *Option) { if opt.backends == nil { opt.backends = make(map[string]struct{}) } - opt.backends["memBuffer"] = struct{}{} - opt.memBuffer = memBuffer + opt.backends["pipe"] = struct{}{} + opt.pipe = pipe } } diff --git a/daemon/containerio/pipe.go b/daemon/containerio/pipe.go new file mode 100644 index 000000000..25b5bf6a5 --- /dev/null +++ b/daemon/containerio/pipe.go @@ -0,0 +1,40 @@ +package containerio + +import ( + "io" +) + +func init() { + Register(func() Backend { + return &pipe{} + }) +} + +type pipe struct { + pipeWriter *io.PipeWriter +} + +func (p *pipe) Name() string { + return "pipe" +} + +func (p *pipe) Init(opt *Option) error { + p.pipeWriter = opt.pipe + return nil +} + +func (p *pipe) Out() io.Writer { + return p.pipeWriter +} + +func (p *pipe) In() io.Reader { + return nil +} + +func (p *pipe) Err() io.Writer { + return p.pipeWriter +} + +func (p *pipe) Close() error { + return p.pipeWriter.Close() +} diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 668b17c24..2dcb1c53d 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -1544,9 +1544,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) { if attach.Stdin { options = append(options, containerio.WithStdinHijack()) } - } else if attach.MemBuffer != nil { - // Attaching using memory buffer. - options = append(options, containerio.WithMemBuffer(attach.MemBuffer)) + } else if attach.Pipe != nil { + // Attaching using pipe. + options = append(options, containerio.WithPipe(attach.Pipe)) } else if attach.Streams != nil { // Attaching using streams. options = append(options, containerio.WithStreams(attach.Streams)) diff --git a/daemon/mgr/container_types.go b/daemon/mgr/container_types.go index 135f35f68..6976961ae 100644 --- a/daemon/mgr/container_types.go +++ b/daemon/mgr/container_types.go @@ -1,8 +1,8 @@ package mgr import ( - "bytes" "fmt" + "io" "net/http" "os" "sync" @@ -71,8 +71,8 @@ type AttachConfig struct { Hijack http.Hijacker Upgrade bool - // Attach using memory buffer. - MemBuffer *bytes.Buffer + // Attach using pipe. + Pipe *io.PipeWriter // Attach using streams. Streams *remotecommand.Streams