Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add timeout handler for execSync in cri part #1318

Merged
merged 1 commit into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions cri/src/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -696,55 +697,74 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
// TODO: handle timeout.
id := r.GetContainerId()

timeout := time.Duration(r.GetTimeout()) * time.Second
var cancel context.CancelFunc
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}

execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
if err != nil {
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
}

var output bytes.Buffer
startConfig := &apitypes.ExecStartConfig{}
reader, writer := io.Pipe()
defer writer.Close()

attachConfig := &mgr.AttachConfig{
Stdout: true,
Stderr: true,
MemBuffer: &output,
Pipe: writer,
MuxDisabled: true,
}

startConfig := &apitypes.ExecStartConfig{}

err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

var execConfig *mgr.ContainerExecConfig
for {
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
readWaitCh := make(chan error, 1)
var recv bytes.Buffer
go func() {
defer reader.Close()
_, err = io.Copy(&recv, reader)
readWaitCh <- err
}()

select {
case <-ctx.Done():
//TODO maybe stop the execution?
return nil, fmt.Errorf("timeout %v exceeded", timeout)
case readWaitErr := <-readWaitCh:
if readWaitErr != nil {
return nil, fmt.Errorf("failed to read data from the pipe: %v", err)
}
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
}
// Loop until exec finished.
if !execConfig.Running {
break

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
}
time.Sleep(100 * time.Millisecond)
}

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
return &runtime.ExecSyncResponse{
Stdout: recv.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

return &runtime.ExecSyncResponse{
Stdout: output.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Expand Down
42 changes: 0 additions & 42 deletions daemon/containerio/mem_buffer.go

This file was deleted.

12 changes: 6 additions & 6 deletions daemon/containerio/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package containerio

import (
"bytes"
"io"
"net/http"
"os"

Expand All @@ -18,7 +18,7 @@ type Option struct {
hijack http.Hijacker
hijackUpgrade bool
stdinBackend string
memBuffer *bytes.Buffer
pipe *io.PipeWriter
streams *remotecommand.Streams
criLogFile *os.File
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) {
}
}

// WithMemBuffer specified the memory buffer backend.
func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) {
// WithPipe specified the pipe backend.
func WithPipe(pipe *io.PipeWriter) func(*Option) {
return func(opt *Option) {
if opt.backends == nil {
opt.backends = make(map[string]struct{})
}
opt.backends["memBuffer"] = struct{}{}
opt.memBuffer = memBuffer
opt.backends["pipe"] = struct{}{}
opt.pipe = pipe
}
}

Expand Down
40 changes: 40 additions & 0 deletions daemon/containerio/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package containerio

import (
"io"
)

func init() {
Register(func() Backend {
return &pipe{}
})
}

type pipe struct {
pipeWriter *io.PipeWriter
}

func (p *pipe) Name() string {
return "pipe"
}

func (p *pipe) Init(opt *Option) error {
p.pipeWriter = opt.pipe
return nil
}

func (p *pipe) Out() io.Writer {
return p.pipeWriter
}

func (p *pipe) In() io.Reader {
return nil
}

func (p *pipe) Err() io.Writer {
return p.pipeWriter
}

func (p *pipe) Close() error {
return p.pipeWriter.Close()
}
6 changes: 3 additions & 3 deletions daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,9 +1544,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) {
if attach.Stdin {
options = append(options, containerio.WithStdinHijack())
}
} else if attach.MemBuffer != nil {
// Attaching using memory buffer.
options = append(options, containerio.WithMemBuffer(attach.MemBuffer))
} else if attach.Pipe != nil {
// Attaching using pipe.
options = append(options, containerio.WithPipe(attach.Pipe))
} else if attach.Streams != nil {
// Attaching using streams.
options = append(options, containerio.WithStreams(attach.Streams))
Expand Down
6 changes: 3 additions & 3 deletions daemon/mgr/container_types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package mgr

import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -71,8 +71,8 @@ type AttachConfig struct {
Hijack http.Hijacker
Upgrade bool

// Attach using memory buffer.
MemBuffer *bytes.Buffer
// Attach using pipe.
Pipe *io.PipeWriter

// Attach using streams.
Streams *remotecommand.Streams
Expand Down