diff --git a/daemon/mgr/cri.go b/daemon/mgr/cri.go index 5e14162e7e..5ff61b7a41 100644 --- a/daemon/mgr/cri.go +++ b/daemon/mgr/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -20,7 +21,6 @@ import ( "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/version" - // NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+. "github.com/cri-o/ocicni/pkg/ocicni" "github.com/sirupsen/logrus" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -695,20 +695,28 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + var cancel context.CancelFunc + timeout := r.GetTimeout() + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)) + } + defer cancel() + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } + reader, writer := io.Pipe() + var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} attachConfig := &AttachConfig{ Stdout: true, Stderr: true, @@ -716,34 +724,56 @@ func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) ( MuxDisabled: true, } + go func() { + defer writer.Close() + for { + _, err = writer.Write(attachConfig.MemBuffer.Bytes()) + if err != nil { + if err == io.EOF { + fmt.Println("finish writing data to the pipe") + } else { + fmt.Errorf("failed to write data to io pipe: %v", err) + } + } + time.Sleep(100 * time.Millisecond) + } + }() + + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + var recv bytes.Buffer + _, err = io.Copy(&recv, reader) + // no error will be returned if the error is EOF + if err != nil { + fmt.Errorf("failed to read data from the pipe: %v", err) + } + reader.Close() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout") + default: + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.