Skip to content

Commit

Permalink
feature: add timeout handler for execsync in cri part
Browse files Browse the repository at this point in the history
Signed-off-by: Zou Rui <[email protected]>
  • Loading branch information
ZouRui89 committed May 14, 2018
1 parent 5a7236d commit 913fed4
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 77 deletions.
2 changes: 1 addition & 1 deletion daemon/containerio/cri_log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
const (
// delimiter used in cri logging format.
delimiter = ' '
// eof is end-of-line.
// eol is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in cri logging format.
timestampFormat = time.RFC3339Nano
Expand Down
42 changes: 0 additions & 42 deletions daemon/containerio/mem_buffer.go

This file was deleted.

12 changes: 6 additions & 6 deletions daemon/containerio/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package containerio

import (
"bytes"
"io"
"net/http"
"os"

Expand All @@ -18,7 +18,7 @@ type Option struct {
hijack http.Hijacker
hijackUpgrade bool
stdinBackend string
memBuffer *bytes.Buffer
pipe *io.PipeWriter
streams *remotecommand.Streams
criLogFile *os.File
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) {
}
}

// WithMemBuffer specified the memory buffer backend.
func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) {
// WithPipe specified the pipe backend.
func WithPipe(pipe *io.PipeWriter) func(*Option) {
return func(opt *Option) {
if opt.backends == nil {
opt.backends = make(map[string]struct{})
}
opt.backends["memBuffer"] = struct{}{}
opt.memBuffer = memBuffer
opt.backends["pipe"] = struct{}{}
opt.pipe = pipe
}
}

Expand Down
41 changes: 41 additions & 0 deletions daemon/containerio/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package containerio

import (
"io"
)

func init() {
Register(func() Backend {
return &pipe{}
})
}

type pipe struct {
pipeWriter *io.PipeWriter
}

func (p *pipe) Name() string {
return "pipe"
}

func (p *pipe) Init(opt *Option) error {
p.pipeWriter = opt.pipe
return nil
}

func (p *pipe) Out() io.Writer {
return p.pipeWriter
}

func (p *pipe) In() io.Reader {
return nil
}

func (p *pipe) Err() io.Writer {
return p.pipeWriter
}

func (p *pipe) Close() error {
return p.pipeWriter.Close()

}
6 changes: 3 additions & 3 deletions daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,9 +1531,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) {
if attach.Stdin {
options = append(options, containerio.WithStdinHijack())
}
} else if attach.MemBuffer != nil {
// Attaching using memory buffer.
options = append(options, containerio.WithMemBuffer(attach.MemBuffer))
} else if attach.Pipe != nil {
// Attaching using pipe.
options = append(options, containerio.WithPipe(attach.Pipe))
} else if attach.Streams != nil {
// Attaching using streams.
options = append(options, containerio.WithStreams(attach.Streams))
Expand Down
4 changes: 2 additions & 2 deletions daemon/mgr/container_types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package mgr

import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -72,7 +72,7 @@ type AttachConfig struct {
Upgrade bool

// Attach using memory buffer.
MemBuffer *bytes.Buffer
Pipe *io.PipeWriter

// Attach using streams.
Streams *remotecommand.Streams
Expand Down
66 changes: 43 additions & 23 deletions daemon/mgr/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/alibaba/pouch/pkg/utils"
"github.com/alibaba/pouch/version"

// NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+.
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -695,55 +695,75 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
// TODO: handle timeout.
id := r.GetContainerId()

timeout := time.Duration(r.GetTimeout()) * time.Second
var cancel context.CancelFunc
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}

execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
if err != nil {
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
}

var output bytes.Buffer
startConfig := &apitypes.ExecStartConfig{}
reader, writer := io.Pipe()
defer writer.Close()

attachConfig := &AttachConfig{
Stdout: true,
Stderr: true,
MemBuffer: &output,
Pipe: writer,
MuxDisabled: true,
}

startConfig := &apitypes.ExecStartConfig{}

err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

var execConfig *ContainerExecConfig
for {
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
readWaitCh := make(chan error, 1)
var recv bytes.Buffer
go func() {
defer reader.Close()
_, err = io.Copy(&recv, reader)
readWaitCh <- err
}()

select {
case <-ctx.Done():
//TODO maybe stop the execution?
return nil, fmt.Errorf("timeout %v exceeded", timeout)
case <-readWaitCh:
checkError := <-readWaitCh
if checkError != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %v", execid, err)
}
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
}
// Loop until exec finished.
if !execConfig.Running {
break

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
}
time.Sleep(100 * time.Millisecond)
}

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
return &runtime.ExecSyncResponse{
Stdout: recv.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

return &runtime.ExecSyncResponse{
Stdout: output.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Expand Down

0 comments on commit 913fed4

Please sign in to comment.