Skip to content

Commit

Permalink
feature: add timeout handler for execsync in cri part
Browse files Browse the repository at this point in the history
Signed-off-by: Zou Rui <[email protected]>
  • Loading branch information
ZouRui89 committed May 11, 2018
1 parent bd54a92 commit 16ecda0
Showing 1 changed file with 51 additions and 21 deletions.
72 changes: 51 additions & 21 deletions daemon/mgr/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/alibaba/pouch/pkg/utils"
"github.com/alibaba/pouch/version"

// NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+.
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -695,55 +695,85 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
// TODO: handle timeout.
id := r.GetContainerId()

var cancel context.CancelFunc
timeout := r.GetTimeout()
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout))
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}

execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
if err != nil {
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
}

reader, writer := io.Pipe()

var output bytes.Buffer
startConfig := &apitypes.ExecStartConfig{}
attachConfig := &AttachConfig{
Stdout: true,
Stderr: true,
MemBuffer: &output,
MuxDisabled: true,
}

go func() {
defer writer.Close()
for {
_, err = writer.Write(attachConfig.MemBuffer.Bytes())
if err != nil {
if err == io.EOF {
logrus.Infof("finish writing data to the pipe")
} else {
logrus.Errorf("failed to write data to io pipe: %v", err)
}
}
time.Sleep(100 * time.Millisecond)
}
}()

startConfig := &apitypes.ExecStartConfig{}

err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

var execConfig *ContainerExecConfig
for {
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
var recv bytes.Buffer
_, err = io.Copy(&recv, reader)
// no error will be returned if the error is EOF
if err != nil {
logrus.Errorf("failed to read data from the pipe: %v", err)
}
reader.Close()

select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout")
default:
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
}
// Loop until exec finished.
if !execConfig.Running {
break

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
}
time.Sleep(100 * time.Millisecond)
}

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
return &runtime.ExecSyncResponse{
Stdout: recv.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

return &runtime.ExecSyncResponse{
Stdout: output.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Expand Down

0 comments on commit 16ecda0

Please sign in to comment.