Skip to content

Commit

Permalink
feature: add timeout handler for execsync in cri part
Browse files Browse the repository at this point in the history
Signed-off-by: Zou Rui <[email protected]>
  • Loading branch information
ZouRui89 committed May 11, 2018
1 parent bd54a92 commit 9f3b771
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
5 changes: 2 additions & 3 deletions daemon/containerio/mem_buffer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package containerio

import (
"bytes"
"io"
)

Expand All @@ -12,7 +11,7 @@ func init() {
}

type memBuffer struct {
buffer *bytes.Buffer
buffer *io.PipeWriter
}

func (b *memBuffer) Name() string {
Expand All @@ -29,7 +28,7 @@ func (b *memBuffer) Out() io.Writer {
}

func (b *memBuffer) In() io.Reader {
return b.buffer
return nil
}

func (b *memBuffer) Err() io.Writer {
Expand Down
6 changes: 3 additions & 3 deletions daemon/containerio/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package containerio

import (
"bytes"
"io"
"net/http"
"os"

Expand All @@ -18,7 +18,7 @@ type Option struct {
hijack http.Hijacker
hijackUpgrade bool
stdinBackend string
memBuffer *bytes.Buffer
memBuffer *io.PipeWriter
streams *remotecommand.Streams
criLogFile *os.File
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func WithStdinHijack() func(*Option) {
}

// WithMemBuffer specified the memory buffer backend.
func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) {
func WithMemBuffer(memBuffer *io.PipeWriter) func(*Option) {
return func(opt *Option) {
if opt.backends == nil {
opt.backends = make(map[string]struct{})
Expand Down
4 changes: 2 additions & 2 deletions daemon/mgr/container_types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package mgr

import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -72,7 +72,7 @@ type AttachConfig struct {
Upgrade bool

// Attach using memory buffer.
MemBuffer *bytes.Buffer
MemBuffer *io.PipeWriter

// Attach using streams.
Streams *remotecommand.Streams
Expand Down
64 changes: 41 additions & 23 deletions daemon/mgr/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/alibaba/pouch/pkg/utils"
"github.com/alibaba/pouch/version"

// NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+.
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -695,55 +695,73 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
// TODO: handle timeout.
id := r.GetContainerId()

var cancel context.CancelFunc
timeout := r.GetTimeout()
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout))
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}

execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
if err != nil {
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
}

var output bytes.Buffer
startConfig := &apitypes.ExecStartConfig{}
reader, writer := io.Pipe()
defer writer.Close()

attachConfig := &AttachConfig{
Stdout: true,
Stderr: true,
MemBuffer: &output,
MemBuffer: writer,
MuxDisabled: true,
}

startConfig := &apitypes.ExecStartConfig{}

err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

var execConfig *ContainerExecConfig
for {
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
readWaitCh := make(chan error, 1)
var recv bytes.Buffer
go func() {
defer reader.Close()
_, err = io.Copy(&recv, reader)
if err != nil {
logrus.Infof("failed to read data from the pipe")
}
readWaitCh <- err
}()

select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout")
case <-readWaitCh:
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
}
// Loop until exec finished.
if !execConfig.Running {
break

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
}
time.Sleep(100 * time.Millisecond)
}

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
return &runtime.ExecSyncResponse{
Stdout: recv.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

return &runtime.ExecSyncResponse{
Stdout: output.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Expand Down

0 comments on commit 9f3b771

Please sign in to comment.