From 97312bbf8401d17cdebed4ce9a47c979cfdd80a3 Mon Sep 17 00:00:00 2001 From: Zou Rui <21751189@zju.edu.cn> Date: Tue, 8 May 2018 15:00:46 +0800 Subject: [PATCH] feature: add timeout handler for execsync in cri part Signed-off-by: Zou Rui <21751189@zju.edu.cn> --- daemon/containerio/mem_buffer.go | 5 +-- daemon/containerio/options.go | 6 +-- daemon/mgr/container_types.go | 4 +- daemon/mgr/cri.go | 73 ++++++++++++++++++++++---------- 4 files changed, 57 insertions(+), 31 deletions(-) diff --git a/daemon/containerio/mem_buffer.go b/daemon/containerio/mem_buffer.go index aaa4bf99df..41d8a67e9d 100644 --- a/daemon/containerio/mem_buffer.go +++ b/daemon/containerio/mem_buffer.go @@ -1,7 +1,6 @@ package containerio import ( - "bytes" "io" ) @@ -12,7 +11,7 @@ func init() { } type memBuffer struct { - buffer *bytes.Buffer + buffer *io.PipeWriter } func (b *memBuffer) Name() string { @@ -29,7 +28,7 @@ func (b *memBuffer) Out() io.Writer { } func (b *memBuffer) In() io.Reader { - return b.buffer + return nil } func (b *memBuffer) Err() io.Writer { diff --git a/daemon/containerio/options.go b/daemon/containerio/options.go index 2b3ca8e885..6267f9803d 100644 --- a/daemon/containerio/options.go +++ b/daemon/containerio/options.go @@ -1,7 +1,7 @@ package containerio import ( - "bytes" + "io" "net/http" "os" @@ -18,7 +18,7 @@ type Option struct { hijack http.Hijacker hijackUpgrade bool stdinBackend string - memBuffer *bytes.Buffer + memBuffer *io.PipeWriter streams *remotecommand.Streams criLogFile *os.File } @@ -102,7 +102,7 @@ func WithStdinHijack() func(*Option) { } // WithMemBuffer specified the memory buffer backend. -func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) { +func WithMemBuffer(memBuffer *io.PipeWriter) func(*Option) { return func(opt *Option) { if opt.backends == nil { opt.backends = make(map[string]struct{}) diff --git a/daemon/mgr/container_types.go b/daemon/mgr/container_types.go index 4dd33e1503..efaa12b521 100644 --- a/daemon/mgr/container_types.go +++ b/daemon/mgr/container_types.go @@ -1,8 +1,8 @@ package mgr import ( - "bytes" "fmt" + "io" "net/http" "os" "sync" @@ -72,7 +72,7 @@ type AttachConfig struct { Upgrade bool // Attach using memory buffer. - MemBuffer *bytes.Buffer + MemBuffer *io.PipeWriter // Attach using streams. Streams *remotecommand.Streams diff --git a/daemon/mgr/cri.go b/daemon/mgr/cri.go index 5e14162e7e..919454bcff 100644 --- a/daemon/mgr/cri.go +++ b/daemon/mgr/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -20,7 +21,6 @@ import ( "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/version" - // NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+. "github.com/cri-o/ocicni/pkg/ocicni" "github.com/sirupsen/logrus" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -695,55 +695,82 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + /*var cancel context.CancelFunc + timeout := r.GetTimeout() + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)) + } + defer cancel() + */ + var timeoutCh <-chan time.Time + timeout := r.GetTimeout() + if timeout == 0 { + // Do not set timeout if it's 0. + timeoutCh = make(chan time.Time) + } else { + timeoutCh = time.After(time.Duration(timeout)) + } + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} + reader, writer := io.Pipe() + defer writer.Close() + attachConfig := &AttachConfig{ Stdout: true, Stderr: true, - MemBuffer: &output, + MemBuffer: writer, MuxDisabled: true, } + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + readWaitCh := make(chan error, 1) + var recv bytes.Buffer + go func() { + defer reader.Close() + _, err = io.Copy(&recv, reader) + if err != nil { + logrus.Infof("failed to read data from the pipe") + } + readWaitCh <- err + }() + + select { + case <-timeoutCh: + return nil, fmt.Errorf("timeout hhhhhhhhhhhhhh") + case <-readWaitCh: + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.