Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add timeout handler for execSync in cri part #1280

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/containerio/cri_log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
const (
// delimiter used in cri logging format.
delimiter = ' '
// eof is end-of-line.
// eol is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in cri logging format.
timestampFormat = time.RFC3339Nano
Expand Down
42 changes: 0 additions & 42 deletions daemon/containerio/mem_buffer.go

This file was deleted.

12 changes: 6 additions & 6 deletions daemon/containerio/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package containerio

import (
"bytes"
"io"
"net/http"
"os"

Expand All @@ -18,7 +18,7 @@ type Option struct {
hijack http.Hijacker
hijackUpgrade bool
stdinBackend string
memBuffer *bytes.Buffer
pipe *io.PipeWriter
streams *remotecommand.Streams
criLogFile *os.File
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) {
}
}

// WithMemBuffer specified the memory buffer backend.
func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) {
// WithPipe specified the pipe backend.
func WithPipe(pipe *io.PipeWriter) func(*Option) {
return func(opt *Option) {
if opt.backends == nil {
opt.backends = make(map[string]struct{})
}
opt.backends["memBuffer"] = struct{}{}
opt.memBuffer = memBuffer
opt.backends["pipe"] = struct{}{}
opt.pipe = pipe
}
}

Expand Down
41 changes: 41 additions & 0 deletions daemon/containerio/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package containerio

import (
"io"
)

func init() {
Register(func() Backend {
return &pipe{}
})
}

type pipe struct {
pipeWriter *io.PipeWriter
}

func (p *pipe) Name() string {
return "pipe"
}

func (p *pipe) Init(opt *Option) error {
p.pipeWriter = opt.pipe
return nil
}

func (p *pipe) Out() io.Writer {
return p.pipeWriter
}

func (p *pipe) In() io.Reader {
return nil
}

func (p *pipe) Err() io.Writer {
return p.pipeWriter
}

func (p *pipe) Close() error {
return p.pipeWriter.Close()

}
6 changes: 3 additions & 3 deletions daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,9 +1531,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) {
if attach.Stdin {
options = append(options, containerio.WithStdinHijack())
}
} else if attach.MemBuffer != nil {
// Attaching using memory buffer.
options = append(options, containerio.WithMemBuffer(attach.MemBuffer))
} else if attach.Pipe != nil {
// Attaching using pipe.
options = append(options, containerio.WithPipe(attach.Pipe))
} else if attach.Streams != nil {
// Attaching using streams.
options = append(options, containerio.WithStreams(attach.Streams))
Expand Down
4 changes: 2 additions & 2 deletions daemon/mgr/container_types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package mgr

import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -72,7 +72,7 @@ type AttachConfig struct {
Upgrade bool

// Attach using memory buffer.
MemBuffer *bytes.Buffer
Pipe *io.PipeWriter

// Attach using streams.
Streams *remotecommand.Streams
Expand Down
66 changes: 43 additions & 23 deletions daemon/mgr/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/alibaba/pouch/pkg/utils"
"github.com/alibaba/pouch/version"

// NOTE: "golang.org/x/net/context" is compatible with standard "context" in golang1.7+.
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -695,55 +695,75 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
// TODO: handle timeout.
id := r.GetContainerId()

timeout := time.Duration(r.GetTimeout()) * time.Second
var cancel context.CancelFunc
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}

execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
if err != nil {
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
}

var output bytes.Buffer
startConfig := &apitypes.ExecStartConfig{}
reader, writer := io.Pipe()
defer writer.Close()

attachConfig := &AttachConfig{
Stdout: true,
Stderr: true,
MemBuffer: &output,
Pipe: writer,
MuxDisabled: true,
}

startConfig := &apitypes.ExecStartConfig{}

err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

var execConfig *ContainerExecConfig
for {
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
readWaitCh := make(chan error, 1)
var recv bytes.Buffer
go func() {
defer reader.Close()
_, err = io.Copy(&recv, reader)
readWaitCh <- err
}()

select {
case <-ctx.Done():
//TODO maybe stop the execution?
return nil, fmt.Errorf("timeout %v exceeded", timeout)
case <-readWaitCh:
checkError := <-readWaitCh
if checkError != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %v", execid, err)
}
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
}
// Loop until exec finished.
if !execConfig.Running {
break

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
}
time.Sleep(100 * time.Millisecond)
}

var stderr []byte
if execConfig.Error != nil {
stderr = []byte(execConfig.Error.Error())
return &runtime.ExecSyncResponse{
Stdout: recv.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

return &runtime.ExecSyncResponse{
Stdout: output.Bytes(),
Stderr: stderr,
ExitCode: int32(execConfig.ExitCode),
}, nil
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Expand Down