From a00e255e4478f8fda515def01e469e47ff0ff89c Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 29 Oct 2018 14:41:15 +0800 Subject: [PATCH] refactor: redesign container io in pouch Signed-off-by: Wei Fu --- apis/server/container_bridge.go | 42 ++- apis/server/exec_bridge.go | 65 ++++- cli/exec.go | 6 +- cli/run.go | 5 + cri/stream/remotecommand/attach.go | 1 - cri/stream/remotecommand/httpstream.go | 2 - cri/stream/runtime.go | 62 ++-- cri/v1alpha1/cri.go | 71 ++--- cri/v1alpha1/cri_utils.go | 19 -- cri/v1alpha2/cri.go | 66 ++--- cri/v1alpha2/cri_utils.go | 19 -- ctrd/container.go | 204 +++++++++++-- ctrd/watch.go | 16 +- daemon/containerio/backend.go | 33 --- daemon/containerio/cio.go | 297 +++++++++---------- daemon/containerio/container_io.go | 378 ------------------------- daemon/containerio/cri_log_file.go | 121 -------- daemon/containerio/discard.go | 47 --- daemon/containerio/hijack_conn.go | 77 ----- daemon/containerio/io.go | 197 +++++++++++++ daemon/containerio/jsonfile.go | 162 ----------- daemon/containerio/options.go | 156 ---------- daemon/containerio/pipe.go | 40 --- daemon/containerio/streams.go | 64 ----- daemon/containerio/syslog.go | 73 ----- daemon/logger/copier.go | 81 ++++++ daemon/logger/copier_test.go | 93 ++++++ daemon/logger/crilog/log.go | 150 ++++++++++ daemon/logger/jsonfile/jsonfile.go | 36 +++ daemon/logger/jsonfile/utils_test.go | 2 + daemon/logger/syslog/syslog.go | 10 + daemon/logger/syslog/syslog_test.go | 2 + daemon/logger/types.go | 10 + daemon/mgr/container.go | 204 +++++++------ daemon/mgr/container_exec.go | 79 +++--- daemon/mgr/container_logger.go | 15 +- pkg/ioutils/writer.go | 43 +++ pkg/streams/multi.go | 52 ++++ pkg/streams/multi_test.go | 66 +++++ pkg/streams/stream.go | 103 +++++++ pkg/streams/utils.go | 185 ++++++++++++ pkg/streams/utils_test.go | 75 +++++ test/cli_exec_test.go | 19 ++ test/cli_run_interactive_test.go | 84 ++++++ 44 files changed, 1826 insertions(+), 1706 deletions(-) delete mode 100644 daemon/containerio/backend.go delete mode 100644 daemon/containerio/container_io.go delete mode 100644 daemon/containerio/cri_log_file.go delete mode 100644 daemon/containerio/discard.go delete mode 100644 daemon/containerio/hijack_conn.go create mode 100644 daemon/containerio/io.go delete mode 100644 daemon/containerio/jsonfile.go delete mode 100644 daemon/containerio/options.go delete mode 100644 daemon/containerio/pipe.go delete mode 100644 daemon/containerio/streams.go delete mode 100644 daemon/containerio/syslog.go create mode 100644 daemon/logger/copier.go create mode 100644 daemon/logger/copier_test.go create mode 100644 daemon/logger/crilog/log.go create mode 100644 daemon/logger/types.go create mode 100644 pkg/ioutils/writer.go create mode 100644 pkg/streams/multi.go create mode 100644 pkg/streams/multi_test.go create mode 100644 pkg/streams/stream.go create mode 100644 pkg/streams/utils.go create mode 100644 pkg/streams/utils_test.go create mode 100644 test/cli_run_interactive_test.go diff --git a/apis/server/container_bridge.go b/apis/server/container_bridge.go index da355b936..9edb0801c 100644 --- a/apis/server/container_bridge.go +++ b/apis/server/container_bridge.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "strconv" "strings" @@ -13,6 +14,7 @@ import ( "github.com/alibaba/pouch/apis/types" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/httputils" + "github.com/alibaba/pouch/pkg/streams" "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/pkg/utils/filters" @@ -284,26 +286,40 @@ func (s *Server) renameContainer(ctx context.Context, rw http.ResponseWriter, re func (s *Server) attachContainer(ctx context.Context, rw http.ResponseWriter, req *http.Request) error { name := mux.Vars(req)["name"] - _, upgrade := req.Header["Upgrade"] - hijacker, ok := rw.(http.Hijacker) - if !ok { - return fmt.Errorf("not a hijack connection, container: %s", name) - } + var ( + err error + closeFn func() error + attach = new(streams.AttachConfig) + stdin io.ReadCloser + stdout io.Writer + ) - attach := &mgr.AttachConfig{ - Hijack: hijacker, - Stdin: req.FormValue("stdin") == "1", - Stdout: true, - Stderr: true, - Upgrade: upgrade, + stdin, stdout, closeFn, err = openHijackConnection(rw) + if err != nil { + return err } - if err := s.ContainerMgr.Attach(ctx, name, attach); err != nil { - // TODO handle error + // close hijack stream + defer closeFn() + + if upgrade { + fmt.Fprintf(stdout, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") + } else { + fmt.Fprintf(stdout, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") } + attach.UseStdin = httputils.BoolValue(req, "stdin") + attach.Stdin = stdin + attach.UseStdout = true + attach.Stdout = stdout + attach.UseStderr = true + attach.Stderr = stdout + + if err := s.ContainerMgr.AttachContainerIO(ctx, name, attach); err != nil { + stdout.Write([]byte(err.Error() + "\r\n")) + } return nil } diff --git a/apis/server/exec_bridge.go b/apis/server/exec_bridge.go index f3467d7a6..a419cf683 100644 --- a/apis/server/exec_bridge.go +++ b/apis/server/exec_bridge.go @@ -4,13 +4,15 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "strconv" "github.com/alibaba/pouch/apis/types" - "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/httputils" + "github.com/alibaba/pouch/pkg/streams" + "github.com/docker/docker/pkg/stdcopy" "github.com/go-openapi/strfmt" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -64,28 +66,47 @@ func (s *Server) startContainerExec(ctx context.Context, rw http.ResponseWriter, return err } - var attach *mgr.AttachConfig + var ( + err error + closeFn func() error + attach = new(streams.AttachConfig) + stdin io.ReadCloser + stdout io.Writer + ) // TODO(huamin.thm): support detach exec process through http post method if !config.Detach { - hijacker, ok := rw.(http.Hijacker) - if !ok { - return fmt.Errorf("not a hijack connection, container: %s", name) + stdin, stdout, closeFn, err = openHijackConnection(rw) + if err != nil { + return err } - attach = &mgr.AttachConfig{ - Hijack: hijacker, - Stdin: config.Tty, - Stdout: true, - Stderr: true, - Upgrade: upgrade, + // close hijack stream + defer closeFn() + + if upgrade { + fmt.Fprintf(stdout, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") + } else { + fmt.Fprintf(stdout, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") + } + + attach.UseStdin, attach.Stdin = true, stdin + attach.Terminal = config.Tty + + if config.Tty { + attach.UseStdout, attach.Stdout = true, stdout + } else { + attach.UseStdout, attach.Stdout = true, stdcopy.NewStdWriter(stdout, stdcopy.Stdout) + attach.UseStderr, attach.Stderr = true, stdcopy.NewStdWriter(stdout, stdcopy.Stderr) } } if err := s.ContainerMgr.StartExec(ctx, name, attach); err != nil { - logrus.Errorf("failed to run exec process: %s", err) + if config.Detach { + return err + } + attach.Stdout.Write([]byte(err.Error() + "\r\n")) } - return nil } @@ -124,3 +145,21 @@ func (s *Server) resizeExec(ctx context.Context, rw http.ResponseWriter, req *ht return nil } + +func openHijackConnection(rw http.ResponseWriter) (io.ReadCloser, io.Writer, func() error, error) { + hijacker, ok := rw.(http.Hijacker) + if !ok { + return nil, nil, nil, fmt.Errorf("not a hijack connection") + } + + conn, _, err := hijacker.Hijack() + if err != nil { + return nil, nil, nil, err + } + + // set raw mode + conn.Write([]byte{}) + return conn, conn, func() error { + return conn.Close() + }, nil +} diff --git a/cli/exec.go b/cli/exec.go index a0a191303..f2007ade6 100644 --- a/cli/exec.go +++ b/cli/exec.go @@ -9,6 +9,7 @@ import ( "os" "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/pkg/ioutils" "github.com/docker/docker/pkg/stdcopy" "github.com/sirupsen/logrus" @@ -145,9 +146,12 @@ func holdHijackConnection(ctx context.Context, conn net.Conn, reader *bufio.Read go func() { if stdin { io.Copy(conn, os.Stdin) + // close write if receive CTRL-D + if cw, ok := conn.(ioutils.CloseWriter); ok { + cw.CloseWrite() + } } - // TODO: close write side of conn close(stdinDone) }() diff --git a/cli/run.go b/cli/run.go index c56534c54..65955c57c 100644 --- a/cli/run.go +++ b/cli/run.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/pkg/ioutils" "github.com/spf13/cobra" ) @@ -135,6 +136,10 @@ func (rc *RunCommand) runRun(args []string) error { }() go func() { io.Copy(conn, os.Stdin) + // close write if receive CTRL-D + if cw, ok := conn.(ioutils.CloseWriter); ok { + cw.CloseWrite() + } }() } diff --git a/cri/stream/remotecommand/attach.go b/cri/stream/remotecommand/attach.go index ba860b206..f7da6ae83 100644 --- a/cri/stream/remotecommand/attach.go +++ b/cri/stream/remotecommand/attach.go @@ -24,7 +24,6 @@ func ServeAttach(ctx context.Context, w http.ResponseWriter, req *http.Request, defer streamCtx.conn.Close() err := attacher.Attach(ctx, container, streamOpts, &Streams{ - StreamCh: make(chan struct{}, 1), StdinStream: streamCtx.stdinStream, StdoutStream: streamCtx.stdoutStream, StderrStream: streamCtx.stderrStream, diff --git a/cri/stream/remotecommand/httpstream.go b/cri/stream/remotecommand/httpstream.go index 91a60697c..8a6a0f5bd 100644 --- a/cri/stream/remotecommand/httpstream.go +++ b/cri/stream/remotecommand/httpstream.go @@ -28,8 +28,6 @@ type Options struct { // Streams contains all the streams used to stdio for // remote command execution. type Streams struct { - // Notified from StreamCh if streams broken. - StreamCh chan struct{} StdinStream io.ReadCloser StdoutStream io.WriteCloser StderrStream io.WriteCloser diff --git a/cri/stream/runtime.go b/cri/stream/runtime.go index dd91bd71d..8192fa949 100644 --- a/cri/stream/runtime.go +++ b/cri/stream/runtime.go @@ -7,11 +7,11 @@ import ( "io" "os/exec" "strings" - "time" apitypes "github.com/alibaba/pouch/apis/types" "github.com/alibaba/pouch/cri/stream/remotecommand" "github.com/alibaba/pouch/daemon/mgr" + pkgstreams "github.com/alibaba/pouch/pkg/streams" "github.com/sirupsen/logrus" ) @@ -52,17 +52,6 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri return 0, fmt.Errorf("failed to create exec for container %q: %v", containerID, err) } - attachConfig := &mgr.AttachConfig{ - Stdin: streamOpts.Stdin, - Streams: streams, - MuxDisabled: true, - } - - err = s.containerMgr.StartExec(ctx, execid, attachConfig) - if err != nil { - return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err) - } - handleResizing(containerID, execid, resizeChan, func(size apitypes.ResizeOptions) { err := s.containerMgr.ResizeExec(ctx, execid, size) if err != nil { @@ -70,20 +59,24 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri } }) - // TODO Find a better way instead of the dead loop - var ei *apitypes.ContainerExecInspect - for { - ei, err = s.containerMgr.InspectExec(ctx, execid) - if err != nil { - return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err) - } - // Loop until exec finished. - if !ei.Running { - break - } - time.Sleep(100 * time.Millisecond) + attachCfg := &pkgstreams.AttachConfig{ + UseStdin: createConfig.AttachStdin, + Stdin: streams.StdinStream, + UseStdout: createConfig.AttachStdout, + Stdout: streams.StdoutStream, + UseStderr: createConfig.AttachStderr, + Stderr: streams.StderrStream, + Terminal: createConfig.Tty, } + if err := s.containerMgr.StartExec(ctx, execid, attachCfg); err != nil { + return 0, fmt.Errorf("failed to exec for container %q: %v", containerID, err) + } + + ei, err := s.containerMgr.InspectExec(ctx, execid) + if err != nil { + return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err) + } return uint32(ei.ExitCode), nil } @@ -110,20 +103,19 @@ func handleResizing(containerID, execID string, resizeChan <-chan apitypes.Resiz // Attach attaches to a running container. func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error { - attachConfig := &mgr.AttachConfig{ - Stdin: streamOpts.Stdin, - Stdout: streamOpts.Stdout, - Stderr: streamOpts.Stderr, - Streams: streams, + // TODO(fuweid): could we close stdin after stop attach? + attachCfg := &pkgstreams.AttachConfig{ + UseStdin: streamOpts.Stdin, + Stdin: streams.StdinStream, + UseStdout: streamOpts.Stdout, + Stdout: streams.StdoutStream, + UseStderr: streamOpts.Stderr, + Stderr: streams.StderrStream, + Terminal: streamOpts.TTY, } - - err := s.containerMgr.Attach(ctx, containerID, attachConfig) - if err != nil { + if err := s.containerMgr.AttachContainerIO(ctx, containerID, attachCfg); err != nil { return fmt.Errorf("failed to attach to container %q: %v", containerID, err) } - - <-streams.StreamCh - return nil } diff --git a/cri/v1alpha1/cri.go b/cri/v1alpha1/cri.go index c08839598..b6a9b6bfb 100644 --- a/cri/v1alpha1/cri.go +++ b/cri/v1alpha1/cri.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "io" "os" "path" "path/filepath" @@ -23,6 +22,7 @@ import ( "github.com/alibaba/pouch/pkg/errtypes" "github.com/alibaba/pouch/pkg/meta" "github.com/alibaba/pouch/pkg/reference" + "github.com/alibaba/pouch/pkg/streams" "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/version" @@ -603,11 +603,7 @@ func (c *CriManager) CreateContainer(ctx context.Context, r *runtime.CreateConta // Get container log. if config.GetLogPath() != "" { logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) - // NOTE: If we attach log here, the IO of container will be created - // by this function first, so we should decide whether open the stdin - // here. It's weird actually, make it more elegant in the future. - err = c.attachLog(logPath, containerID, config.Stdin) - if err != nil { + if err := c.ContainerMgr.AttachCRILog(ctx, containerID, logPath); err != nil { return nil, err } } @@ -905,60 +901,37 @@ func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) ( defer cancel() createConfig := &apitypes.ExecCreateConfig{ - Cmd: r.GetCmd(), + Cmd: r.GetCmd(), + AttachStdout: true, + AttachStderr: true, } + execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - reader, writer := io.Pipe() - defer writer.Close() - - attachConfig := &mgr.AttachConfig{ - Stdout: true, - Stderr: true, - Pipe: writer, - MuxDisabled: true, + stdoutBuf, stderrBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil) + attachCfg := &streams.AttachConfig{ + UseStdout: true, + Stdout: stdoutBuf, + UseStderr: true, + Stderr: stderrBuf, } - - err = c.ContainerMgr.StartExec(ctx, execid, attachConfig) - if err != nil { + if err := c.ContainerMgr.StartExec(ctx, execid, attachCfg); err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - readWaitCh := make(chan error, 1) - var recv bytes.Buffer - go func() { - defer reader.Close() - _, err = io.Copy(&recv, reader) - readWaitCh <- err - }() - - select { - case <-ctx.Done(): - //TODO maybe stop the execution? - return nil, fmt.Errorf("timeout %v exceeded", timeout) - case readWaitErr := <-readWaitCh: - if readWaitErr != nil { - return nil, fmt.Errorf("failed to read data from the pipe: %v", err) - } - execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) - if err != nil { - return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) - } - - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) - } - - return &runtime.ExecSyncResponse{ - Stdout: recv.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) + if err != nil { + return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } + + return &runtime.ExecSyncResponse{ + Stdout: stdoutBuf.Bytes(), + Stderr: stderrBuf.Bytes(), + ExitCode: int32(execConfig.ExitCode), + }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. diff --git a/cri/v1alpha1/cri_utils.go b/cri/v1alpha1/cri_utils.go index 54a6302ac..2bb2edfe0 100644 --- a/cri/v1alpha1/cri_utils.go +++ b/cri/v1alpha1/cri_utils.go @@ -869,25 +869,6 @@ func parseUserFromImageUser(id string) string { return id } -func (c *CriManager) attachLog(logPath string, containerID string, openStdin bool) error { - f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) - if err != nil { - return fmt.Errorf("failed to create container for opening log file failed: %v", err) - } - // Attach to the container to get log. - attachConfig := &mgr.AttachConfig{ - Stdin: openStdin, - Stdout: true, - Stderr: true, - CriLogFile: f, - } - err = c.ContainerMgr.Attach(context.Background(), containerID, attachConfig) - if err != nil { - return fmt.Errorf("failed to attach to container %q to get its log: %v", containerID, err) - } - return nil -} - func (c *CriManager) getContainerMetrics(ctx context.Context, meta *mgr.Container) (*runtime.ContainerStats, error) { var usedBytes, inodesUsed uint64 diff --git a/cri/v1alpha2/cri.go b/cri/v1alpha2/cri.go index cd4bfea3d..e77076d3b 100644 --- a/cri/v1alpha2/cri.go +++ b/cri/v1alpha2/cri.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "io" "os" "path" "path/filepath" @@ -27,6 +26,7 @@ import ( "github.com/alibaba/pouch/pkg/errtypes" "github.com/alibaba/pouch/pkg/meta" "github.com/alibaba/pouch/pkg/reference" + pkgstreams "github.com/alibaba/pouch/pkg/streams" "github.com/alibaba/pouch/pkg/utils" util_metrics "github.com/alibaba/pouch/pkg/utils/metrics" "github.com/alibaba/pouch/version" @@ -788,11 +788,7 @@ func (c *CriManager) CreateContainer(ctx context.Context, r *runtime.CreateConta // Get container log. if config.GetLogPath() != "" { logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) - // NOTE: If we attach log here, the IO of container will be created - // by this function first, so we should decide whether open the stdin - // here. It's weird actually, make it more elegant in the future. - err = c.attachLog(logPath, containerID, config.Stdin) - if err != nil { + if err := c.ContainerMgr.AttachCRILog(ctx, containerID, logPath); err != nil { return nil, err } } @@ -1180,53 +1176,27 @@ func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) ( return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - reader, writer := io.Pipe() - defer writer.Close() - - attachConfig := &mgr.AttachConfig{ - Stdout: true, - Stderr: true, - Pipe: writer, - MuxDisabled: true, + stdoutBuf, stderrBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil) + attachCfg := &pkgstreams.AttachConfig{ + UseStdout: true, + Stdout: stdoutBuf, + UseStderr: true, + Stderr: stderrBuf, } - - err = c.ContainerMgr.StartExec(ctx, execid, attachConfig) - if err != nil { + if err := c.ContainerMgr.StartExec(ctx, execid, attachCfg); err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - readWaitCh := make(chan error, 1) - var recv bytes.Buffer - go func() { - defer reader.Close() - _, err = io.Copy(&recv, reader) - readWaitCh <- err - }() - - select { - case <-ctx.Done(): - //TODO maybe stop the execution? - return nil, fmt.Errorf("timeout %v exceeded", timeout) - case readWaitErr := <-readWaitCh: - if readWaitErr != nil { - return nil, fmt.Errorf("failed to read data from the pipe: %v", err) - } - execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) - if err != nil { - return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) - } - - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) - } - - return &runtime.ExecSyncResponse{ - Stdout: recv.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) + if err != nil { + return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } + + return &runtime.ExecSyncResponse{ + Stdout: stdoutBuf.Bytes(), + Stderr: stderrBuf.Bytes(), + ExitCode: int32(execConfig.ExitCode), + }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. diff --git a/cri/v1alpha2/cri_utils.go b/cri/v1alpha2/cri_utils.go index 39cc6f074..6f991cc48 100644 --- a/cri/v1alpha2/cri_utils.go +++ b/cri/v1alpha2/cri_utils.go @@ -921,25 +921,6 @@ func parseUserFromImageUser(id string) string { return id } -func (c *CriManager) attachLog(logPath string, containerID string, openStdin bool) error { - f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) - if err != nil { - return fmt.Errorf("failed to create container for opening log file failed: %v", err) - } - // Attach to the container to get log. - attachConfig := &mgr.AttachConfig{ - Stdin: openStdin, - Stdout: true, - Stderr: true, - CriLogFile: f, - } - err = c.ContainerMgr.Attach(context.Background(), containerID, attachConfig) - if err != nil { - return fmt.Errorf("failed to attach to container %q to get its log: %v", containerID, err) - } - return nil -} - func (c *CriManager) getContainerMetrics(ctx context.Context, meta *mgr.Container) (*runtime.ContainerStats, error) { var usedBytes, inodesUsed uint64 diff --git a/ctrd/container.go b/ctrd/container.go index 4eeb568b7..bad29d987 100644 --- a/ctrd/container.go +++ b/ctrd/container.go @@ -6,23 +6,26 @@ import ( "fmt" "io" "runtime" + "strings" + "sync" "syscall" "time" "github.com/alibaba/pouch/apis/types" "github.com/alibaba/pouch/daemon/containerio" "github.com/alibaba/pouch/pkg/errtypes" + "github.com/alibaba/pouch/pkg/ioutils" "github.com/containerd/containerd" containerdtypes "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/archive" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/oci" - "github.com/docker/docker/pkg/stdcopy" imagespec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -89,20 +92,33 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error { return err } - var ( - pStdout io.Writer = process.IO.Stdout - pStderr io.Writer = process.IO.Stderr - ) + closeStdinCh := make(chan struct{}) - if !process.P.Terminal && !process.IO.MuxDisabled { - pStdout = stdcopy.NewStdWriter(pStdout, stdcopy.Stdout) - pStderr = stdcopy.NewStdWriter(pStderr, stdcopy.Stderr) - } + // make sure the closeStdinCh has been closed. + defer func() { + close(closeStdinCh) + }() - io := containerio.NewIOWithTerminal(process.IO.Stdin, pStdout, pStderr, process.P.Terminal, process.IO.Stdin != nil) + var ( + cntrID, execID = pack.container.ID(), process.ExecID + withStdin, withTerminal = process.IO.Stream().Stdin() != nil, process.P.Terminal + ) // create exec process in container - execProcess, err := pack.task.Exec(ctx, process.ExecID, process.P, io) + execProcess, err := pack.task.Exec(ctx, process.ExecID, process.P, func(_ string) (cio.IO, error) { + logrus.WithFields( + logrus.Fields{ + "container": cntrID, + "process": execID, + }, + ).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal) + + fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal) + if err != nil { + return nil, err + } + return c.createIO(fifoset, cntrID, execID, closeStdinCh, process.IO.InitContainerIO) + }) if err != nil { return errors.Wrap(err, "failed to exec process") } @@ -112,24 +128,32 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error { if err != nil { return errors.Wrap(err, "failed to exec process") } - fail := make(chan error, 1) - defer close(fail) + + errCh := make(chan error, 1) + defer close(errCh) go func() { - status := <-exitStatus - msg := &Message{ - err: status.Error(), - exitCode: status.ExitCode(), - exitTime: status.ExitTime(), + var msg *Message + + if startErr := <-errCh; startErr != nil { + msg = &Message{ + err: startErr, + exitCode: 126, + exitTime: time.Now().UTC(), + } } - if err := <-fail; err != nil { - msg.err = err - // exit code should not be zero when exec get failed. - if msg.exitCode == 0 { - msg.exitCode = 126 + // success to start which means the cmd is valid and wait + // for process. + if msg == nil { + status := <-exitStatus + msg = &Message{ + err: status.Error(), + exitCode: status.ExitCode(), + exitTime: status.ExitTime(), } } + // XXX: if exec process get run, io should be closed in this function, for _, hook := range c.hooks { if err := hook(process.ExecID, msg); err != nil { @@ -146,9 +170,9 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error { // start the exec process if err := execProcess.Start(ctx); err != nil { - fail <- err + errCh <- err + return err } - return nil } @@ -250,7 +274,7 @@ func (c *Client) RecoverContainer(ctx context.Context, id string, io *containeri } // recoverContainer reload the container from metadata and watch it, if program be restarted. -func (c *Client) recoverContainer(ctx context.Context, id string, io *containerio.IO) error { +func (c *Client) recoverContainer(ctx context.Context, id string, io *containerio.IO) (err0 error) { wrapperCli, err := c.Get(ctx) if err != nil { return fmt.Errorf("failed to get a containerd grpc client: %v", err) @@ -269,7 +293,9 @@ func (c *Client) recoverContainer(ctx context.Context, id string, io *containeri return errors.Wrapf(err, "failed to load container(%s)", id) } - task, err := lc.Task(ctx, containerio.WithAttach(io.Stdin, io.Stdout, io.Stderr)) + task, err := lc.Task(ctx, func(fset *cio.FIFOSet) (cio.IO, error) { + return c.attachIO(fset, io.InitContainerIO) + }) if err != nil { if !errdefs.IsNotFound(err) { return errors.Wrap(err, "failed to get task") @@ -534,8 +560,6 @@ func (c *Client) createContainer(ctx context.Context, ref, id, checkpointDir str func (c *Client) createTask(ctx context.Context, id, checkpointDir string, container containerd.Container, cc *Container, client *containerd.Client) (p *containerPack, err0 error) { var pack *containerPack - io := containerio.NewIOWithTerminal(cc.IO.Stdin, cc.IO.Stdout, cc.IO.Stderr, cc.Spec.Process.Terminal, cc.IO.Stdin != nil) - checkpoint, err := createCheckpointDescriptor(ctx, checkpointDir, client) if err != nil { return pack, errors.Wrapf(err, "failed to create checkpoint descriptor") @@ -550,8 +574,24 @@ func (c *Client) createTask(ctx context.Context, id, checkpointDir string, conta } }() + var ( + cntrID, execID = id, id + withStdin, withTerminal = cc.IO.Stream().Stdin() != nil, cc.Spec.Process.Terminal + closeStdinCh = make(chan struct{}) + ) + // create task - task, err := container.NewTask(ctx, io, withCheckpointOpt(checkpoint)) + task, err := container.NewTask(ctx, func(_ string) (cio.IO, error) { + logrus.WithField("container", cntrID).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal) + + fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal) + if err != nil { + return nil, err + } + return c.createIO(fifoset, cntrID, execID, closeStdinCh, cc.IO.InitContainerIO) + }, withCheckpointOpt(checkpoint)) + close(closeStdinCh) + if err != nil { return pack, errors.Wrapf(err, "failed to create task for container(%s)", id) } @@ -791,3 +831,107 @@ func withCheckpointOpt(checkpoint *containerdtypes.Descriptor) containerd.NewTas return nil } } + +// InitStdio allows caller to handle any initialize job. +type InitStdio func(dio *containerio.DirectIO) (cio.IO, error) + +func (c *Client) createIO(fifoSet *containerio.CioFIFOSet, cntrID, procID string, closeStdinCh <-chan struct{}, initstdio InitStdio) (cio.IO, error) { + cdio, err := containerio.NewDirectIO(context.Background(), fifoSet) + if err != nil { + return nil, err + } + + if cdio.Stdin != nil { + var ( + errClose error + stdinOnce sync.Once + ) + oldStdin := cdio.Stdin + cdio.Stdin = ioutils.NewWriteCloserWrapper(oldStdin, func() error { + stdinOnce.Do(func() { + errClose = oldStdin.Close() + + // Both the caller and container/exec process holds write side pipe + // for the stdin. When the caller closes the write pipe, the process doesn't + // exit until the caller calls the CloseIO. + go func() { + <-closeStdinCh + if err := c.closeStdinIO(cntrID, procID); err != nil { + // TODO(fuweid): for the CloseIO grpc call, the containerd doesn't + // return correct status code if the process doesn't exist. + // for the case, we should use strings.Contains to reduce warning + // log. it will be fixed in containerd#2747. + if !errdefs.IsNotFound(err) && !strings.Contains(err.Error(), "not found") { + logrus.WithError(err).Warnf("failed to close stdin containerd IO (container:%v, process:%v", cntrID, procID) + } + } + }() + }) + return errClose + }) + } + + cntrio, err := initstdio(cdio) + if err != nil { + cdio.Cancel() + cdio.Close() + return nil, err + } + return cntrio, nil +} + +func (c *Client) attachIO(fifoSet *cio.FIFOSet, initstdio InitStdio) (cio.IO, error) { + if fifoSet == nil { + return nil, fmt.Errorf("cannot attach to existing fifos") + } + + cdio, err := containerio.NewDirectIO(context.Background(), &containerio.CioFIFOSet{ + Config: cio.Config{ + Terminal: fifoSet.Terminal, + Stdin: fifoSet.In, + Stdout: fifoSet.Out, + Stderr: fifoSet.Err, + }, + }) + if err != nil { + return nil, err + } + + cntrio, err := initstdio(cdio) + if err != nil { + cdio.Cancel() + cdio.Close() + return nil, err + } + return cntrio, nil +} + +// closeStdinIO is used to close the write side of fifo in containerd-shim. +// +// NOTE: we should use client to make rpc call directly. if we retrieve it from +// watch, it might return 404 because the pack is saved into cache after Start. +func (c *Client) closeStdinIO(containerID, processID string) error { + ctx := context.Background() + wrapperCli, err := c.Get(ctx) + if err != nil { + return fmt.Errorf("failed to get a containerd grpc client: %v", err) + } + + cli := wrapperCli.client + cntr, err := cli.LoadContainer(ctx, containerID) + if err != nil { + return err + } + + t, err := cntr.Task(ctx, nil) + if err != nil { + return err + } + + p, err := t.LoadProcess(ctx, processID, nil) + if err != nil { + return err + } + + return p.CloseIO(ctx, containerd.WithStdinCloser) +} diff --git a/ctrd/watch.go b/ctrd/watch.go index 9f8048136..cc8bcedc5 100644 --- a/ctrd/watch.go +++ b/ctrd/watch.go @@ -92,14 +92,6 @@ func (w *watch) add(pack *containerPack) { // not the grpc client executing this parts of code. pack.client.Produce(1) - if _, err := pack.task.Delete(context.Background()); err != nil { - logrus.Errorf("failed to delete task, container id: %s: %v", pack.id, err) - } - - if err := pack.container.Delete(context.Background()); err != nil { - logrus.Errorf("failed to delete container, container id: %s: %v", pack.id, err) - } - msg := &Message{ err: status.Error(), exitCode: status.ExitCode(), @@ -115,6 +107,14 @@ func (w *watch) add(pack *containerPack) { } } + // NOTE: we should delete task/container after update the status, for example, status code. + if _, err := pack.task.Delete(context.Background()); err != nil { + logrus.Errorf("failed to delete task, container id: %s: %v", pack.id, err) + } + + if err := pack.container.Delete(context.Background()); err != nil { + logrus.Errorf("failed to delete container, container id: %s: %v", pack.id, err) + } pack.ch <- msg }(w, pack) diff --git a/daemon/containerio/backend.go b/daemon/containerio/backend.go deleted file mode 100644 index f3c2eb477..000000000 --- a/daemon/containerio/backend.go +++ /dev/null @@ -1,33 +0,0 @@ -package containerio - -import ( - "io" -) - -var backendFactorys []func() Backend - -// Backend defines the real output/input of the container's stdio. -type Backend interface { - // Name defines the backend's name. - Name() string - - // Init initializes the backend io. - Init(opt *Option) error - - // Out returns the stdout. - Out() io.Writer - - // In returns the stdin. - In() io.Reader - - // Err returns the stderr. - Err() io.Writer - - // Close closes the io. - Close() error -} - -// Register adds a backend. -func Register(create func() Backend) { - backendFactorys = append(backendFactorys, create) -} diff --git a/daemon/containerio/cio.go b/daemon/containerio/cio.go index abcf46058..ad470cd5d 100644 --- a/daemon/containerio/cio.go +++ b/daemon/containerio/cio.go @@ -2,7 +2,6 @@ package containerio import ( "context" - "fmt" "io" "io/ioutil" "os" @@ -10,221 +9,185 @@ import ( "sync" "syscall" - containerdio "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/cio" "github.com/containerd/fifo" + "github.com/sirupsen/logrus" ) -// cio is a basic container IO implementation. -type cio struct { - config containerdio.Config - - closer *wgCloser +// CioFIFOSet holds the fifo pipe for containerd shim. +// +// It will be replaced by the containerd@v1.2 +type CioFIFOSet struct { + cio.Config + closeFn func() error } -func (c *cio) Config() containerdio.Config { - return c.config -} - -func (c *cio) Cancel() { - if c.closer == nil { - return +// Close will remove all the relatived files. +func (c *CioFIFOSet) Close() error { + if c.closeFn != nil { + return c.closeFn() } - c.closer.Cancel() -} - -func (c *cio) Wait() { - if c.closer == nil { - return - } - c.closer.Wait() -} - -func (c *cio) Close() error { - if c.closer == nil { - return nil - } - return c.closer.Close() + return nil } -// NewFifos returns a new set of fifos for the task -func NewFifos(id string, stdin bool) (*containerdio.FIFOSet, error) { +// NewCioFIFOSet prepares fifo files. +func NewCioFIFOSet(processID string, withStdin bool, withTerminal bool) (*CioFIFOSet, error) { root := "/run/containerd/fifo" if err := os.MkdirAll(root, 0700); err != nil { return nil, err } - dir, err := ioutil.TempDir(root, "") + + fifoDir, err := ioutil.TempDir(root, "") if err != nil { return nil, err } - fifos := &containerdio.FIFOSet{ - Dir: dir, - In: filepath.Join(dir, id+"-stdin"), - Out: filepath.Join(dir, id+"-stdout"), - Err: filepath.Join(dir, id+"-stderr"), + + cfg := cio.Config{ + Terminal: withTerminal, + Stdout: filepath.Join(fifoDir, processID+"-stdout"), + } + + if withStdin { + cfg.Stdin = filepath.Join(fifoDir, processID+"-stdin") } - if !stdin { - fifos.In = "" + if !withTerminal { + cfg.Stderr = filepath.Join(fifoDir, processID+"-stderr") } - return fifos, nil + closeFn := func() error { + err := os.RemoveAll(fifoDir) + if err != nil { + logrus.WithError(err).Warnf("failed to remove process(id=%v) fifo dir", processID) + } + return err + } + + return &CioFIFOSet{ + Config: cfg, + closeFn: closeFn, + }, nil } -type ioSet struct { - in io.Reader - out, err io.Writer +// TODO(fuweid): containerIO will be removed when update vendor to containerd@v1.2. +type containerIO struct { + config cio.Config + wg *sync.WaitGroup + closers []io.Closer + cancel context.CancelFunc } -type wgCloser struct { - wg *sync.WaitGroup - dir string - set []io.Closer - cancel context.CancelFunc +func (c *containerIO) Config() cio.Config { + return c.config } -func (g *wgCloser) Wait() { - g.wg.Wait() +func (c *containerIO) Wait() { + if c.wg != nil { + c.wg.Wait() + } } -func (g *wgCloser) Close() error { - for _, f := range g.set { - f.Close() +func (c *containerIO) Close() error { + var lastErr error + for _, closer := range c.closers { + if closer == nil { + continue + } + if err := closer.Close(); err != nil { + lastErr = err + } } - if g.dir != "" { - return os.RemoveAll(g.dir) + return lastErr +} + +func (c *containerIO) Cancel() { + if c.cancel != nil { + c.cancel() } - return nil } -func (g *wgCloser) Cancel() { - g.cancel() +// TODO(fuweid): pipes will be removed when update vendor to containerd@v1.2. +type pipes struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func (p *pipes) Closers() []io.Closer { + return []io.Closer{p.Stdout, p.Stderr, p.Stdin} } -func copyIO(fifos *containerdio.FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { +// DirectIO allows task IO to be handled externally by the caller. +// +// TODO(fuweid): DirectIO will be removed when update vendor to containerd@v1.2. +type DirectIO struct { + pipes + containerIO +} + +var _ cio.IO = &DirectIO{} + +// NewDirectIO returns an IO implementation that exposes the IO streams as +// io.ReadCloser and io.WriteCloser. +// +// TODO(fuweid): NewDirectIO will be removed when update vendor to containerd@v1.2. +func NewDirectIO(ctx context.Context, fifos *CioFIFOSet) (*DirectIO, error) { + ctx, cancel := context.WithCancel(ctx) + pipes, err := openPipes(ctx, fifos) + if err != nil { + cancel() + return nil, err + } + + return &DirectIO{ + pipes: pipes, + containerIO: containerIO{ + config: fifos.Config, + closers: append(pipes.Closers(), fifos), + cancel: cancel, + }, + }, nil +} + +func openPipes(ctx context.Context, fifos *CioFIFOSet) (_ pipes, err0 error) { var ( - f io.ReadWriteCloser - set []io.Closer - ctx, cancel = context.WithCancel(context.Background()) - wg = &sync.WaitGroup{} + err error + p pipes ) + defer func() { - if err != nil { - for _, f := range set { - f.Close() - } - cancel() + if err0 != nil { + fifos.Close() } }() - // if fifos directory is not exist, create fifo will fails, - // also in case of fifo directory lost in container recovery process. - if _, err := os.Stat(fifos.Dir); err != nil && os.IsNotExist(err) { - os.MkdirAll(fifos.Dir, 0700) - } - - if fifos.In != "" { - if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err + if fifos.Stdin != "" { + if p.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return p, err } - set = append(set, f) - go func(w io.WriteCloser) { - io.Copy(w, ioset.in) - w.Close() - }(f) - } - if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.out, r) - r.Close() - wg.Done() - }(f) - - if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - - if !tty { - wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.err, r) - r.Close() - wg.Done() - }(f) + defer func() { + if err != nil && p.Stdin != nil { + p.Stdin.Close() + } + }() } - return &wgCloser{ - wg: wg, - dir: fifos.Dir, - set: set, - cancel: cancel, - }, nil -} -// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal -func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool, stdinEnable bool) containerdio.Creation { - return func(id string) (_ containerdio.IO, err error) { - paths, err := NewFifos(id, stdinEnable) - if err != nil { - return nil, err + if fifos.Stdout != "" { + if p.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return p, err } defer func() { - if err != nil && paths.Dir != "" { - os.RemoveAll(paths.Dir) + if err != nil && p.Stdout != nil { + p.Stdout.Close() } }() - cfg := containerdio.Config{ - Terminal: terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } - i := &cio{config: cfg} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err - } - i.closer = closer - return i, nil } -} -// WithAttach attaches the existing io for a task to the provided io.Reader/Writers -func WithAttach(stdin io.Reader, stdout, stderr io.Writer) containerdio.Attach { - return func(paths *containerdio.FIFOSet) (containerdio.IO, error) { - if paths == nil { - return nil, fmt.Errorf("cannot attach to existing fifos") - } - // judge stdin is initial, make logic same with copyIO. - if i, ok := stdin.(*ContainerIO); (ok && i == nil) || (stdin == nil) { - paths.In = "" - } - cfg := containerdio.Config{ - Terminal: paths.Terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } - i := &cio{config: cfg} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err + if fifos.Stderr != "" { + if p.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return p, err } - i.closer = closer - return i, nil } + return p, nil } diff --git a/daemon/containerio/container_io.go b/daemon/containerio/container_io.go deleted file mode 100644 index 7c37fdd95..000000000 --- a/daemon/containerio/container_io.go +++ /dev/null @@ -1,378 +0,0 @@ -package containerio - -import ( - "fmt" - "io" - - "github.com/alibaba/pouch/pkg/ringbuffer" - - "github.com/sirupsen/logrus" -) - -const ( - stdout stdioType = iota - stderr - stdin - discard -) - -type stdioType int - -func (t stdioType) String() string { - switch t { - case stdout: - return "STDOUT" - case stdin: - return "STDIN" - case stderr: - return "STDERR" - case discard: - return "DISCARD" - } - return "INVALID" -} - -// IO wraps the three container's ios of stdout, stderr, stdin. -type IO struct { - Stdout *ContainerIO - Stderr *ContainerIO - Stdin *ContainerIO - - // For IO backend like http, we need to mux stdout & stderr - // if terminal is disabled. - // But for other IO backend, it is not necessary. - // So we should make it configurable. - MuxDisabled bool -} - -// NewIO creates the container's ios of stdout, stderr, stdin. -func NewIO(opt *Option) *IO { - backends := createBackend(opt) - - i := &IO{ - Stdout: create(opt, stdout, backends), - Stderr: create(opt, stderr, backends), - MuxDisabled: opt.muxDisabled, - } - - if opt.stdin { - i.Stdin = create(opt, stdin, backends) - } - - return i -} - -// AddBackend adds more backends to container's stdio. -func (io *IO) AddBackend(opt *Option) { - backends := createBackend(opt) - - for t, s := range map[stdioType]*ContainerIO{ - stdout: io.Stdout, - stderr: io.Stderr, - } { - s.add(opt, t, backends) - } - - if opt.stdin && io.Stdin != nil { - io.Stdin.add(opt, stdin, backends) - } -} - -// Close closes the container's io. -func (io *IO) Close() error { - io.Stderr.Close() - io.Stdout.Close() - if io.Stdin != nil { - io.Stdin.Close() - } - return nil -} - -// ContainerIO used to control the container's stdio. -type ContainerIO struct { - Option - backends []containerBackend - total int64 - typ stdioType - closed bool - // The stdin of all backends should put into ring first. - ring *ringbuffer.RingBuffer -} - -func (cio *ContainerIO) add(opt *Option, typ stdioType, backends map[string]containerBackend) { - if typ == stdin { - for _, b := range backends { - if b.backend.Name() == opt.stdinBackend { - cio.backends = append(cio.backends, b) - go func(b containerBackend) { - cio.converge(b.backend.Name(), opt.id, b.backend.In()) - b.backend.Close() - }(b) - break - } - } - } else { - for _, b := range backends { - cio.backends = append(cio.backends, b) - } - } -} - -func create(opt *Option, typ stdioType, backends map[string]containerBackend) *ContainerIO { - io := &ContainerIO{ - total: 0, - typ: typ, - closed: false, - Option: *opt, - } - - if typ == stdin { - io.ring = ringbuffer.New(-1) - for _, b := range backends { - if b.backend.Name() == opt.stdinBackend { - io.backends = append(io.backends, b) - go func(b containerBackend) { - // For backend with stdin, close it if stdin finished. - io.converge(b.backend.Name(), opt.id, b.backend.In()) - b.backend.Close() - }(b) - break - } - } - } else { - for _, b := range backends { - io.backends = append(io.backends, b) - } - } - - return io -} - -func createBackend(opt *Option) map[string]containerBackend { - backends := make(map[string]containerBackend) - - for _, create := range backendFactorys { - backend := create() - if _, ok := opt.backends[backend.Name()]; !ok { - continue - } - - if err := backend.Init(opt); err != nil { - // FIXME skip the backend. - logrus.Errorf("failed to initialize backend: %s, id: %s, %v", backend.Name(), opt.id, err) - continue - } - - backends[backend.Name()] = containerBackend{ - backend: backend, - outRing: ringbuffer.New(-1), - errRing: ringbuffer.New(-1), - } - } - - // start to subscribe stdout and stderr ring buffer. - for _, b := range backends { - - // the goroutine don't exit forever. - go func(b containerBackend) { - subscribe(b.backend.Name(), opt.id, b.outRing, b.backend.Out()) - }(b) - go func(b containerBackend) { - subscribe(b.backend.Name(), opt.id, b.errRing, b.backend.Err()) - }(b) - } - - return backends -} - -// OpenStdin returns open container's stdin or not. -func (cio *ContainerIO) OpenStdin() bool { - if cio.typ != stdin { - return false - } - if cio.closed { - return false - } - return len(cio.backends) != 0 -} - -// Read implements the standard Read interface. -func (cio *ContainerIO) Read(p []byte) (int, error) { - if cio.typ != stdin { - return 0, fmt.Errorf("invalid container io type: %s, id: %s", cio.typ, cio.id) - } - if cio.closed { - return 0, fmt.Errorf("container io is closed") - } - - value, _ := cio.ring.Pop() - data, ok := value.([]byte) - if !ok { - return 0, nil - } - n := copy(p, data) - - return n, nil -} - -// Write implements the standard Write interface. -func (cio *ContainerIO) Write(data []byte) (int, error) { - if cio.typ == stdin { - return 0, fmt.Errorf("invalid container io type: %s, id: %s", cio.typ, cio.id) - } - if cio.closed { - return 0, fmt.Errorf("container io is closed") - } - - if cio.typ == discard { - return len(data), nil - } - - // FIXME(fuwei): In case that the data slice is reused by the writer, - // we should copy the data before we push it into the ringbuffer. - // The previous data shares the same address with the coming data. - // If we don't copy the data and the previous data isn't consumed by - // ringbuf pop action, the incoming data will override the previous data - // in the ringbuf. - // - // However, copy data maybe impact the performance. We need to reconsider - // other better way to handle the IO. - copyData := make([]byte, len(data)) - copy(copyData, data) - - switch cio.typ { - case stdout: - for _, b := range cio.backends { - cover, err := b.outRing.Push(copyData) - // skip if it is closed ringbuffer - if err != nil { - continue - } - - if cover { - logrus.Warnf("cover stdout data, backend: %s, id: %s", b.backend.Name(), cio.id) - } - } - case stderr: - for _, b := range cio.backends { - cover, err := b.errRing.Push(copyData) - // skip if it is closed ringbuffer - if err != nil { - continue - } - - if cover { - logrus.Warnf("cover stderr data, backend: %s, id: %s", b.backend.Name(), cio.id) - } - } - } - - return len(data), nil -} - -// Close implements the standard Close interface. -func (cio *ContainerIO) Close() error { - // FIXME(fuwei): stdin should be treated like stdout, stderr. - if cio.typ == stdin && cio.ring != nil { - // NOTE: let converge goroutine quit - cio.ring.Close() - } - - for _, b := range cio.backends { - // we need to close ringbuf before close backend, because close ring will flush - // the remain data into backend. - name := b.backend.Name() - - b.outRing.Close() - b.errRing.Close() - if err := b.drainRingBuffer(); err != nil { - logrus.Warnf("failed to drain ringbuffer for backend: %s, id: %s", name, cio.id) - } - - b.backend.Close() - logrus.Infof("close containerio backend: %s, id: %s", name, cio.id) - } - - cio.closed = true - return nil -} - -// FIXME(fuwei): just one ringbuffer for one backend -type containerBackend struct { - backend Backend - outRing *ringbuffer.RingBuffer - errRing *ringbuffer.RingBuffer -} - -func (cb *containerBackend) drainRingBuffer() error { - for _, item := range []struct { - data []interface{} - w io.Writer - }{ - {data: cb.outRing.Drain(), w: cb.backend.Out()}, - {data: cb.errRing.Drain(), w: cb.backend.Err()}, - } { - for _, value := range item.data { - if b, ok := value.([]byte); ok { - if _, err := item.w.Write(b); err != nil { - return err - } - } - } - } - return nil -} - -// subscribe be called in a groutine. -func subscribe(name, id string, ring *ringbuffer.RingBuffer, out io.Writer) { - logrus.Infof("start to subscribe io, backend: %s, id: %s", name, id) - - for { - value, err := ring.Pop() - // break loop if the ringbuffer has been closed - if err != nil { - break - } - - if b, ok := value.([]byte); ok { - if _, err := out.Write(b); err != nil { - logrus.Errorf("failed to write containerio backend: %s, id: %s, %v", name, id, err) - } - } - } - logrus.Infof("finished to subscribe io, backend: %s, id: %s", name, id) -} - -// converge be called in a goroutine. -func (cio *ContainerIO) converge(name, id string, in io.Reader) { - // TODO: we should implement this function more elegant and robust. - logrus.Infof("start to converge io, backend: %s, id: %s", name, id) - - data := make([]byte, 128) - for { - n, err := in.Read(data) - if err != nil { - logrus.Errorf("failed to read from backend: %s, id: %s, %v", name, id, err) - break - } - - // FIXME(fuwei): In case that the data slice is reused by the writer, - // we should copy the data before we push it into the ringbuffer. - // The previous data shares the same address with the coming data. - // If we don't copy the data and the previous data isn't consumed by - // ringbuf pop action, the incoming data will override the previous data - // in the ringbuf. - copyData := make([]byte, n) - copy(copyData, data[:n]) - - cover, err := cio.ring.Push(copyData) - if err != nil { - break - } - - if cover { - logrus.Warnf("cover data, backend: %s, id: %s", name, id) - } - } - logrus.Infof("finished to converge io, backend: %s, id: %s", name, id) -} diff --git a/daemon/containerio/cri_log_file.go b/daemon/containerio/cri_log_file.go deleted file mode 100644 index d1e242008..000000000 --- a/daemon/containerio/cri_log_file.go +++ /dev/null @@ -1,121 +0,0 @@ -package containerio - -import ( - "bufio" - "bytes" - "io" - "os" - "time" - - "github.com/sirupsen/logrus" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" -) - -const ( - // delimiter used in cri logging format. - delimiter = ' ' - // eol is end-of-line. - eol = '\n' - // timestampFormat is the timestamp format used in cri logging format. - timestampFormat = time.RFC3339Nano - // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. - pipeBufSize = 4096 - // bufSize is the size of the read buffer. - bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ -) - -// StreamType is the type of the stream. -type StreamType string - -const ( - // Stdin stream type. - Stdin StreamType = "stdin" - // Stdout stream type. - Stdout StreamType = "stdout" - // Stderr stream type. - Stderr StreamType = "stderr" -) - -func init() { - Register(func() Backend { - return &criLogFile{} - }) -} - -type criLogFile struct { - file *os.File - outPipeWriter *io.PipeWriter - outPipeReader *io.PipeReader - errPipeWriter *io.PipeWriter - errPipeReader *io.PipeReader - closed bool -} - -func (c *criLogFile) Name() string { - return "cri-log-file" -} - -func (c *criLogFile) Init(opt *Option) error { - c.file = opt.criLogFile - c.outPipeReader, c.outPipeWriter = io.Pipe() - c.errPipeReader, c.errPipeWriter = io.Pipe() - go redirectLogs(c.file, c.outPipeReader, Stdout) - go redirectLogs(c.file, c.errPipeReader, Stderr) - return nil -} - -func redirectLogs(w io.WriteCloser, r io.ReadCloser, stream StreamType) { - defer r.Close() - defer w.Close() - streamBytes := []byte(stream) - delimiterBytes := []byte{delimiter} - partialBytes := []byte(runtime.LogTagPartial) - fullBytes := []byte(runtime.LogTagFull) - br := bufio.NewReaderSize(r, bufSize) - for { - lineBytes, isPrefix, err := br.ReadLine() - if err != nil { - if err == io.EOF { - logrus.Infof("finish redirecting log file") - } else { - logrus.Errorf("failed to redirect log file: %v", err) - } - return - } - tagBytes := fullBytes - if isPrefix { - tagBytes = partialBytes - } - timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) - data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes) - data = append(data, eol) - // TODO: maybe lock here? - _, err = w.Write(data) - if err != nil { - logrus.Errorf("failed to write %q log to log file: %v", stream, err) - } - } -} - -func (c *criLogFile) Out() io.Writer { - return c.outPipeWriter -} - -func (c *criLogFile) Err() io.Writer { - return c.errPipeWriter -} - -func (c *criLogFile) In() io.Reader { - // Log doesn't need stdin. - return nil -} - -func (c *criLogFile) Close() error { - if c.closed { - return nil - } - c.closed = true - c.outPipeWriter.Close() - c.errPipeWriter.Close() - return nil -} diff --git a/daemon/containerio/discard.go b/daemon/containerio/discard.go deleted file mode 100644 index 4102a2326..000000000 --- a/daemon/containerio/discard.go +++ /dev/null @@ -1,47 +0,0 @@ -package containerio - -import ( - "io" -) - -func init() { - Register(func() Backend { - return &discardIO{} - }) -} - -type discardIO struct{} - -func (d *discardIO) Name() string { - return "discard" -} - -func (d *discardIO) Init(opt *Option) error { - return nil -} - -func (d *discardIO) Out() io.Writer { - return d -} - -func (d *discardIO) Err() io.Writer { - return d -} - -func (d *discardIO) In() io.Reader { - return d -} - -func (d *discardIO) Close() error { - return nil -} - -func (d *discardIO) Write(data []byte) (int, error) { - return len(data), nil -} - -func (d *discardIO) Read(p []byte) (int, error) { - block := make(chan struct{}) - <-block - return 0, nil -} diff --git a/daemon/containerio/hijack_conn.go b/daemon/containerio/hijack_conn.go deleted file mode 100644 index efb0a54ab..000000000 --- a/daemon/containerio/hijack_conn.go +++ /dev/null @@ -1,77 +0,0 @@ -package containerio - -import ( - "fmt" - "io" - "net" - "net/http" -) - -func init() { - Register(func() Backend { - return &hijackConn{} - }) -} - -type hijackConn struct { - hijack http.Hijacker - conn net.Conn - closed bool -} - -func createHijackConn() Backend { - return &hijackConn{} -} - -func (h *hijackConn) Name() string { - return "hijack" -} - -func (h *hijackConn) Init(opt *Option) error { - conn, _, err := opt.hijack.Hijack() - if err != nil { - return err - } - - // set raw mode - conn.Write([]byte{}) - - if opt.hijackUpgrade { - fmt.Fprintf(conn, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") - } else { - fmt.Fprintf(conn, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") - } - - h.hijack = opt.hijack - h.conn = conn - - return nil -} - -func (h *hijackConn) Out() io.Writer { - return h -} - -func (h *hijackConn) In() io.Reader { - return h -} - -func (h *hijackConn) Err() io.Writer { - return h -} - -func (h *hijackConn) Close() error { - if h.closed { - return nil - } - h.closed = true - return h.conn.Close() -} - -func (h *hijackConn) Write(data []byte) (int, error) { - return h.conn.Write(data) -} - -func (h *hijackConn) Read(p []byte) (int, error) { - return h.conn.Read(p) -} diff --git a/daemon/containerio/io.go b/daemon/containerio/io.go new file mode 100644 index 000000000..30e5a9b0c --- /dev/null +++ b/daemon/containerio/io.go @@ -0,0 +1,197 @@ +package containerio + +import ( + "io" + "time" + + "github.com/alibaba/pouch/daemon/logger" + "github.com/alibaba/pouch/daemon/logger/crilog" + "github.com/alibaba/pouch/pkg/multierror" + "github.com/alibaba/pouch/pkg/streams" + + "github.com/containerd/containerd/cio" + "github.com/sirupsen/logrus" +) + +var ( + logcopierCloseTimeout = 10 * time.Second + streamCloseTimeout = 10 * time.Second +) + +// wrapcio will wrap the DirectIO and IO. +// +// When the task exits, the containerd client will close the wrapcio. +type wrapcio struct { + cio.IO + + ctrio *IO +} + +func (wcio *wrapcio) Wait() { + wcio.IO.Wait() + wcio.ctrio.Wait() +} + +func (wcio *wrapcio) Close() error { + wcio.IO.Close() + + return wcio.ctrio.Close() +} + +// IO represents the streams and logger. +type IO struct { + id string + useStdin bool + stream *streams.Stream + + logdriver logger.LogDriver + logcopier *logger.LogCopier + criLog *crilog.Log +} + +// NewIO return IO instance. +func NewIO(id string, withStdin bool) *IO { + s := streams.NewStream() + if withStdin { + s.NewStdinInput() + } else { + s.NewDiscardStdinInput() + } + + return &IO{ + id: id, + useStdin: withStdin, + stream: s, + } +} + +// Reset reset the logdriver. +func (ctrio *IO) Reset() { + if err := ctrio.Close(); err != nil { + logrus.WithError(err).WithField("process", ctrio.id). + Warnf("failed to close during reset IO") + } + + if ctrio.useStdin { + ctrio.stream.NewStdinInput() + } else { + ctrio.stream.NewDiscardStdinInput() + } + ctrio.logdriver = nil + ctrio.logcopier = nil + ctrio.criLog = nil +} + +// SetLogDriver sets log driver to the IO. +func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) { + ctrio.logdriver = logdriver +} + +// Stream is used to export the stream field. +func (ctrio *IO) Stream() *streams.Stream { + return ctrio.stream +} + +// AttachCRILog will create CRILog and register it into stream. +func (ctrio *IO) AttachCRILog(path string, withTerminal bool) error { + l, err := crilog.New(path, withTerminal) + if err != nil { + return err + } + + // NOTE: it might write the same data into two different files, when + // AttachCRILog is called for ReopenLog. + ctrio.stream.AddStdoutWriter(l.Stdout) + if l.Stderr != nil { + ctrio.stream.AddStderrWriter(l.Stderr) + } + + // NOTE: when close the previous crilog, it will evicted from the stream. + if ctrio.criLog != nil { + ctrio.criLog.Close() + } + ctrio.criLog = l + return nil +} + +// Wait wait for coping-data job. +func (ctrio *IO) Wait() { + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + ctrio.stream.Wait() + }() + + select { + case <-waitCh: + case <-time.After(streamCloseTimeout): + logrus.Warnf("stream doesn't exit in time") + } +} + +// Close closes the stream and the logger. +func (ctrio *IO) Close() error { + multiErrs := new(multierror.Multierrors) + + ctrio.Wait() + if err := ctrio.stream.Close(); err != nil { + multiErrs.Append(err) + } + + if ctrio.logdriver != nil { + if ctrio.logcopier != nil { + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + ctrio.logcopier.Wait() + }() + select { + case <-waitCh: + case <-time.After(logcopierCloseTimeout): + logrus.Warnf("logcopier doesn't exit in time") + } + } + + if err := ctrio.logdriver.Close(); err != nil { + multiErrs.Append(err) + } + } + + if ctrio.criLog != nil { + if err := ctrio.criLog.Close(); err != nil { + multiErrs.Append(err) + } + } + + if multiErrs.Size() > 0 { + return multiErrs + } + return nil +} + +// InitContainerIO will start logger and coping data from fifo. +func (ctrio *IO) InitContainerIO(dio *DirectIO) (cio.IO, error) { + if err := ctrio.startLogging(); err != nil { + return nil, err + } + + ctrio.stream.CopyPipes(streams.Pipes{ + Stdin: dio.Stdin, + Stdout: dio.Stdout, + Stderr: dio.Stderr, + }) + return &wrapcio{IO: dio, ctrio: ctrio}, nil +} + +func (ctrio *IO) startLogging() error { + if ctrio.logdriver == nil { + return nil + } + + ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{ + "stdout": ctrio.stream.NewStdoutPipe(), + "stderr": ctrio.stream.NewStderrPipe(), + }) + ctrio.logcopier.StartCopy() + return nil +} diff --git a/daemon/containerio/jsonfile.go b/daemon/containerio/jsonfile.go deleted file mode 100644 index e95b243b4..000000000 --- a/daemon/containerio/jsonfile.go +++ /dev/null @@ -1,162 +0,0 @@ -package containerio - -import ( - "bufio" - "encoding/json" - "io" - "os" - "path/filepath" - "strings" - "time" - - "github.com/alibaba/pouch/daemon/logger" - "github.com/alibaba/pouch/daemon/logger/jsonfile" - - "github.com/sirupsen/logrus" -) - -func init() { - Register(func() Backend { - return &jsonFile{} - }) -} - -var jsonFilePathName = "json.log" - -// TODO(fuwei): add compress/logrotate configuration -type jsonFile struct { - closed bool - - copier *jsonfile.JSONLogFile - - stdoutWriter io.WriteCloser - stderrWriter io.WriteCloser -} - -func (jf *jsonFile) Name() string { - return "jsonfile" -} - -func (jf *jsonFile) Init(opt *Option) error { - rootDir := opt.info.ContainerRootDir - if _, err := os.Stat(rootDir); err != nil { - return err - } - - logPath := filepath.Join(rootDir, jsonFilePathName) - attrs, err := opt.info.ExtraAttributes(nil) - if err != nil { - return err - } - - var extra []byte - if len(attrs) > 0 { - var err error - extra, err = json.Marshal(attrs) - if err != nil { - return err - } - } - - marshalFunc := func(msg *logger.LogMessage) ([]byte, error) { - return jsonfile.Marshal(msg, extra) - } - - w, err := jsonfile.NewJSONLogFile(logPath, 0644, marshalFunc) - if err != nil { - return err - } - - stdoutReader, stdoutWriter := io.Pipe() - stderrReader, stderrWriter := io.Pipe() - - jf.copier, jf.stdoutWriter, jf.stderrWriter = w, stdoutWriter, stderrWriter - go jf.copy("stdout", stdoutReader) - go jf.copy("stderr", stderrReader) - - return nil -} - -func (jf *jsonFile) In() io.Reader { - return nil -} - -func (jf *jsonFile) Out() io.Writer { - return jf.stdoutWriter -} - -func (jf *jsonFile) Err() io.Writer { - return jf.stderrWriter -} - -func (jf *jsonFile) Close() error { - if jf.closed { - return nil - } - - if err := jf.stdoutWriter.Close(); err != nil { - return err - } - - if err := jf.stderrWriter.Close(); err != nil { - return err - } - - if err := jf.copier.Close(); err != nil { - return err - } - - jf.closed = true - return nil -} - -func (jf *jsonFile) copy(source string, reader io.ReadCloser) { - var ( - bs []byte - err error - - firstPartial = true - isPartial bool - createdTime time.Time - - defaultBufSize = 16 * 1024 - ) - - defer reader.Close() - br := bufio.NewReaderSize(reader, defaultBufSize) - - for { - bs, isPartial, err = br.ReadLine() - if err != nil { - if err != io.EOF { - logrus.Errorf("failed to copy %v message into jsonfile: %v", source, err) - } - return - } - - // NOTE: The partial content will share the same timestamp. - if firstPartial { - createdTime = time.Now().UTC() - } - - if isPartial { - firstPartial = false - } else { - firstPartial = true - bs = append(bs, '\n') - } - - if err = jf.copier.WriteLogMessage(&logger.LogMessage{ - Source: source, - Line: bs, - Timestamp: createdTime, - }); err != nil { - if strings.Contains(err.Error(), os.ErrClosed.Error()) { - logrus.Warnf("failed to copy %v message into jsonfile: the container may be stopped: %v", source, err) - return - } - logrus.Errorf("failed to copy %v message into jsonfile: %v", source, err) - return - } - } -} diff --git a/daemon/containerio/options.go b/daemon/containerio/options.go deleted file mode 100644 index 5efe2ec3f..000000000 --- a/daemon/containerio/options.go +++ /dev/null @@ -1,156 +0,0 @@ -package containerio - -import ( - "io" - "net/http" - "os" - - "github.com/alibaba/pouch/cri/stream/remotecommand" - "github.com/alibaba/pouch/daemon/logger" -) - -// Option is used to pass some data into ContainerIO. -// -// FIXME(fuwei): use logger.Info to separate options and backends. -type Option struct { - info logger.Info - - id string - stdin bool - muxDisabled bool - backends map[string]struct{} - hijack http.Hijacker - hijackUpgrade bool - stdinBackend string - pipe *io.PipeWriter - streams *remotecommand.Streams - criLogFile *os.File -} - -// NewOption creates the Option instance. -func NewOption(opts ...func(*Option)) *Option { - opt := &Option{} - - for _, o := range opts { - o(opt) - } - - return opt -} - -// WithID specified the container's id. -func WithID(id string) func(*Option) { - return func(opt *Option) { - opt.id = id - } -} - -// WithLoggerInfo specified the container's logger information. -func WithLoggerInfo(info logger.Info) func(*Option) { - return func(opt *Option) { - opt.info = info - } -} - -// WithStdin specified whether open the container's stdin. -func WithStdin(stdin bool) func(*Option) { - return func(opt *Option) { - opt.stdin = stdin - } -} - -// WithMuxDisabled specified whether mux stdout & stderr of container IO. -func WithMuxDisabled(muxDisabled bool) func(*Option) { - return func(opt *Option) { - opt.muxDisabled = muxDisabled - } -} - -// WithDiscard specified the discard backend. -func WithDiscard() func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["discard"] = struct{}{} - } -} - -// WithJSONFile specified the jsonfile backend. -func WithJSONFile() func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["jsonfile"] = struct{}{} - } -} - -// WithSyslog specified the syslog backend. -func WithSyslog() func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["syslog"] = struct{}{} - } -} - -// WithHijack specified the hijack backend. -func WithHijack(hi http.Hijacker, upgrade bool) func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["hijack"] = struct{}{} - opt.hijack = hi - opt.hijackUpgrade = upgrade - } -} - -// WithStdinHijack sepcified the stdin with hijack. -func WithStdinHijack() func(*Option) { - return func(opt *Option) { - opt.stdinBackend = "hijack" - } -} - -// WithPipe specified the pipe backend. -func WithPipe(pipe *io.PipeWriter) func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["pipe"] = struct{}{} - opt.pipe = pipe - } -} - -// WithStreams specified the stream backend. -func WithStreams(streams *remotecommand.Streams) func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["streams"] = struct{}{} - opt.streams = streams - } -} - -// WithStdinStream specified the stdin with stream. -func WithStdinStream() func(*Option) { - return func(opt *Option) { - opt.stdinBackend = "streams" - } -} - -// WithCriLogFile specified the cri log file backend. -func WithCriLogFile(criLogFile *os.File) func(*Option) { - return func(opt *Option) { - if opt.backends == nil { - opt.backends = make(map[string]struct{}) - } - opt.backends["cri-log-file"] = struct{}{} - opt.criLogFile = criLogFile - } -} diff --git a/daemon/containerio/pipe.go b/daemon/containerio/pipe.go deleted file mode 100644 index 25b5bf6a5..000000000 --- a/daemon/containerio/pipe.go +++ /dev/null @@ -1,40 +0,0 @@ -package containerio - -import ( - "io" -) - -func init() { - Register(func() Backend { - return &pipe{} - }) -} - -type pipe struct { - pipeWriter *io.PipeWriter -} - -func (p *pipe) Name() string { - return "pipe" -} - -func (p *pipe) Init(opt *Option) error { - p.pipeWriter = opt.pipe - return nil -} - -func (p *pipe) Out() io.Writer { - return p.pipeWriter -} - -func (p *pipe) In() io.Reader { - return nil -} - -func (p *pipe) Err() io.Writer { - return p.pipeWriter -} - -func (p *pipe) Close() error { - return p.pipeWriter.Close() -} diff --git a/daemon/containerio/streams.go b/daemon/containerio/streams.go deleted file mode 100644 index 43dae8660..000000000 --- a/daemon/containerio/streams.go +++ /dev/null @@ -1,64 +0,0 @@ -package containerio - -import ( - "io" - - "github.com/alibaba/pouch/cri/stream/remotecommand" -) - -func init() { - Register(func() Backend { - return &streamIO{} - }) -} - -type streamIO struct { - streams *remotecommand.Streams - closed bool -} - -func (s *streamIO) Name() string { - return "streams" -} - -func (s *streamIO) Init(opt *Option) error { - s.streams = opt.streams - - return nil -} - -func (s *streamIO) Out() io.Writer { - return s.streams.StdoutStream -} - -func (s *streamIO) In() io.Reader { - return s.streams.StdinStream -} - -func (s *streamIO) Err() io.Writer { - return s.streams.StderrStream -} - -func (s *streamIO) Close() error { - if s.closed { - return nil - } - - for _, closer := range []io.Closer{ - s.streams.StdinStream, - s.streams.StdoutStream, - s.streams.StderrStream, - } { - if closer != nil { - closer.Close() - } - } - - if s.streams.StreamCh != nil { - s.streams.StreamCh <- struct{}{} - } - - s.closed = true - - return nil -} diff --git a/daemon/containerio/syslog.go b/daemon/containerio/syslog.go deleted file mode 100644 index 68cb42065..000000000 --- a/daemon/containerio/syslog.go +++ /dev/null @@ -1,73 +0,0 @@ -package containerio - -import ( - "io" - - "github.com/alibaba/pouch/daemon/logger" - "github.com/alibaba/pouch/daemon/logger/syslog" -) - -func init() { - Register(func() Backend { - return &syslogging{} - }) -} - -type customWriter struct { - w func(p []byte) (int, error) -} - -func (cw *customWriter) Write(p []byte) (int, error) { - return cw.w(p) -} - -type syslogging struct { - w *syslog.Syslog -} - -func (s *syslogging) Init(opt *Option) error { - w, err := syslog.NewSyslog(opt.info) - if err != nil { - return err - } - s.w = w - return nil -} - -func (s *syslogging) Name() string { - return "syslog" -} - -func (s *syslogging) In() io.Reader { - return nil -} - -func (s *syslogging) Out() io.Writer { - return &customWriter{w: s.sourceWriteFunc("stdout")} -} - -func (s *syslogging) Err() io.Writer { - return &customWriter{w: s.sourceWriteFunc("stderr")} -} - -func (s *syslogging) Close() error { - return s.w.Close() -} - -func (s *syslogging) sourceWriteFunc(source string) func([]byte) (int, error) { - return func(p []byte) (int, error) { - if len(p) == 0 { - return 0, nil - } - - msg := &logger.LogMessage{ - Source: source, - Line: p, - } - - if err := s.w.WriteLogMessage(msg); err != nil { - return 0, err - } - return len(p), nil - } -} diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go new file mode 100644 index 000000000..46ad86400 --- /dev/null +++ b/daemon/logger/copier.go @@ -0,0 +1,81 @@ +package logger + +import ( + "bufio" + "io" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// LogCopier is used to copy data from stream and write it into LogDriver. +type LogCopier struct { + sync.WaitGroup + srcs map[string]io.Reader + dst LogDriver +} + +// NewLogCopier creates copier for logger. +func NewLogCopier(dst LogDriver, srcs map[string]io.Reader) *LogCopier { + return &LogCopier{ + srcs: srcs, + dst: dst, + } +} + +// StartCopy starts to read the data and write it into logger. +func (lc *LogCopier) StartCopy() { + for source, r := range lc.srcs { + lc.Add(1) + go lc.copy(source, r) + } +} + +func (lc *LogCopier) copy(source string, reader io.Reader) { + defer logrus.Debugf("finish %s stream type logcopy for %s", source, lc.dst.Name()) + defer lc.Done() + + var ( + bs []byte + err error + + firstPartial = true + isPartial bool + createdTime time.Time + + defaultBufSize = 16 * 1024 + ) + + br := bufio.NewReaderSize(reader, defaultBufSize) + for { + bs, isPartial, err = br.ReadLine() + if err != nil { + if err != io.EOF { + logrus.WithError(err). + Errorf("failed to copy into %v-%v", lc.dst.Name(), source) + } + return + } + + // NOTE: The partial content will share the same timestamp. + if firstPartial { + createdTime = time.Now().UTC() + } + + if isPartial { + firstPartial = false + } else { + firstPartial = true + bs = append(bs, '\n') + } + + if err = lc.dst.WriteLogMessage(&LogMessage{ + Source: source, + Line: bs, + Timestamp: createdTime, + }); err != nil { + logrus.WithError(err).Errorf("failed to copy into %v-%v", lc.dst.Name(), source) + } + } +} diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go new file mode 100644 index 000000000..dc9c07770 --- /dev/null +++ b/daemon/logger/copier_test.go @@ -0,0 +1,93 @@ +package logger + +import ( + "bytes" + "encoding/json" + "io" + "sync" + "testing" + "time" +) + +type fakeJSONFileLogDriver struct { + sync.Mutex + *json.Encoder +} + +func (ld *fakeJSONFileLogDriver) Name() string { + return "fake-jsonfile" +} + +func (ld *fakeJSONFileLogDriver) WriteLogMessage(msg *LogMessage) error { + ld.Lock() + defer ld.Unlock() + return ld.Encode(msg) +} + +func (ld *fakeJSONFileLogDriver) Close() error { + return nil +} + +func TestLogCopierBasic(t *testing.T) { + stdoutContent := "data from testing process stdout\n" + stderrContent := "data from testing process stderr\n" + + procStdout, procStderr := bytes.NewBuffer(nil), bytes.NewBuffer(nil) + for i := 0; i < 100; i++ { + if _, err := procStdout.Write([]byte(stdoutContent)); err != nil { + t.Fatalf("failed to write data: %v", err) + } + if _, err := procStderr.Write([]byte(stderrContent)); err != nil { + t.Fatalf("failed to write data: %v", err) + } + } + + jsonMsgBuf := bytes.NewBuffer(nil) + lcopier := NewLogCopier(&fakeJSONFileLogDriver{Encoder: json.NewEncoder(jsonMsgBuf)}, + map[string]io.Reader{ + "stdout": procStdout, + "stderr": procStderr, + }, + ) + + // NOTE: the bytes.Buffer.Read will return io.EOF when there is no data. + lcopier.StartCopy() + + waitCh := make(chan struct{}) + go func() { + lcopier.Wait() + close(waitCh) + }() + select { + case <-time.After(3 * time.Second): + t.Fatal("take long time to finish copy") + case <-waitCh: + } + + // check the data + dec := json.NewDecoder(jsonMsgBuf) + for { + var m LogMessage + err := dec.Decode(&m) + if err == io.EOF { + return + } + + if err != nil { + t.Fatalf("failed to decode the json: %v", err) + } + + switch m.Source { + case "stdout": + if got := string(m.Line); got != stdoutContent { + t.Fatalf("[stdout] expected (%s), but got (%s)", stdoutContent, got) + } + case "stderr": + if got := string(m.Line); got != stderrContent { + t.Fatalf("[stderr] expected (%s), but got (%s)", stderrContent, got) + } + default: + t.Fatalf("invalid the source type: %v", m.Source) + } + } +} diff --git a/daemon/logger/crilog/log.go b/daemon/logger/crilog/log.go new file mode 100644 index 000000000..1fc61bfa7 --- /dev/null +++ b/daemon/logger/crilog/log.go @@ -0,0 +1,150 @@ +package crilog + +import ( + "bufio" + "bytes" + "io" + "os" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" +) + +const ( + // delimiter used in cri logging format. + delimiter = ' ' + // eol is end-of-line. + eol = '\n' + // timestampFormat is the timestamp format used in cri logging format. + timestampFormat = time.RFC3339Nano + // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. + pipeBufSize = 4096 + // bufSize is the size of the read buffer. + bufSize = pipeBufSize - len(timestampFormat) - len(streamStdout) - 2 /*2 delimiter*/ - 1 /*eol*/ + // redirectLogCloseTimeout is used to wait for redirectLogs + redirectLogCloseTimeout = 10 * time.Second +) + +// streamType is the type of the stream. +type streamType string + +const ( + // streamStdout stream type. + streamStdout streamType = "stdout" + // streamStderr stream type. + streamStderr streamType = "stderr" +) + +// Log represents cri log driver. +// +// NOTE: it might be changed caused by ReopenLog API. +type Log struct { + Stdout, Stderr io.WriteCloser + closeFn func() +} + +// Close closes CRI log. +func (l *Log) Close() error { + if l.Stdout != nil { + l.Stdout.Close() + } + + if l.Stderr != nil { + l.Stderr.Close() + } + + if l.closeFn != nil { + l.closeFn() + } + return nil +} + +// New returns WriteCloser for stream. +func New(path string, withTerminal bool) (*Log, error) { + // TODO(fuweid): need to serialize writer since both the stdout and + // stderr share the same writer. + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + if err != nil { + return nil, err + } + + var ( + stdoutw, stderrw io.WriteCloser + stdoutStopCh, stderrStopCh <-chan struct{} + ) + + stdoutw, stdoutStopCh = newCRILogger(path, f, streamStdout) + if !withTerminal { + stderrw, stderrStopCh = newCRILogger(path, f, streamStderr) + } + + closeFn := func() { + if stdoutStopCh != nil { + select { + case <-stdoutStopCh: + case <-time.After(redirectLogCloseTimeout): + logrus.WithField("cri-log", path). + Warn("failed to stop stdout's redirectLogs") + } + } + + if stderrStopCh != nil { + select { + case <-stderrStopCh: + case <-time.After(redirectLogCloseTimeout): + logrus.WithField("cri-log", path). + Warn("failed to stop stderr's redirectLogs") + } + } + f.Close() + } + + return &Log{ + Stdout: stdoutw, + Stderr: stderrw, + closeFn: closeFn, + }, nil +} + +func newCRILogger(path string, w io.Writer, typ streamType) (io.WriteCloser, <-chan struct{}) { + stopCh := make(chan struct{}) + pir, piw := io.Pipe() + go func() { + redirectLogs(path, w, pir, typ) + close(stopCh) + }() + return piw, stopCh +} + +func redirectLogs(path string, w io.Writer, r io.ReadCloser, stream streamType) { + defer r.Close() + + streamBytes := []byte(stream) + delimiterBytes := []byte{delimiter} + partialBytes := []byte(runtime.LogTagPartial) + fullBytes := []byte(runtime.LogTagFull) + br := bufio.NewReaderSize(r, bufSize) + for { + lineBytes, isPrefix, err := br.ReadLine() + if err != nil { + if err == io.EOF { + logrus.Infof("finish redirecting log file(name=%v)", path) + } else { + logrus.WithError(err).Errorf("failed to redirect log file(name=%v)", path) + } + return + } + tagBytes := fullBytes + if isPrefix { + tagBytes = partialBytes + } + timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) + data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes) + data = append(data, eol) + + if _, err := w.Write(data); err != nil { + logrus.Errorf("failed to write %q log to log file: %v", stream, err) + } + } +} diff --git a/daemon/logger/jsonfile/jsonfile.go b/daemon/logger/jsonfile/jsonfile.go index 7ad44132f..5bb08f6b3 100644 --- a/daemon/logger/jsonfile/jsonfile.go +++ b/daemon/logger/jsonfile/jsonfile.go @@ -1,13 +1,17 @@ package jsonfile import ( + "encoding/json" "fmt" "os" + "path/filepath" "sync" "github.com/alibaba/pouch/daemon/logger" ) +var jsonFilePathName = "json.log" + //MarshalFunc is the function of marshal the logMessage type MarshalFunc func(message *logger.LogMessage) ([]byte, error) @@ -21,6 +25,33 @@ type JSONLogFile struct { marshalFunc MarshalFunc } +// Init initializes the jsonfile log driver. +func Init(info logger.Info) (logger.LogDriver, error) { + if _, err := os.Stat(info.ContainerRootDir); err != nil { + return nil, err + } + + logPath := filepath.Join(info.ContainerRootDir, jsonFilePathName) + + attrs, err := info.ExtraAttributes(nil) + if err != nil { + return nil, err + } + + var extra []byte + if len(attrs) > 0 { + var err error + extra, err = json.Marshal(attrs) + if err != nil { + return nil, err + } + } + + return NewJSONLogFile(logPath, 0644, func(msg *logger.LogMessage) ([]byte, error) { + return Marshal(msg, extra) + }) +} + // NewJSONLogFile returns new JSONLogFile instance. func NewJSONLogFile(logPath string, perms os.FileMode, marshalFunc MarshalFunc) (*JSONLogFile, error) { f, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) @@ -36,6 +67,11 @@ func NewJSONLogFile(logPath string, perms os.FileMode, marshalFunc MarshalFunc) }, nil } +// Name return the log driver's name. +func (lf *JSONLogFile) Name() string { + return "json-file" +} + // WriteLogMessage will write the LogMessage into the file. func (lf *JSONLogFile) WriteLogMessage(msg *logger.LogMessage) error { b, err := lf.marshalFunc(msg) diff --git a/daemon/logger/jsonfile/utils_test.go b/daemon/logger/jsonfile/utils_test.go index 5747cc28b..8547530b3 100644 --- a/daemon/logger/jsonfile/utils_test.go +++ b/daemon/logger/jsonfile/utils_test.go @@ -15,6 +15,8 @@ import ( "github.com/alibaba/pouch/pkg/utils" ) +var _ logger.LogDriver = &JSONLogFile{} + func generateFileBytes(lines int) []byte { buf := bytes.NewBuffer(nil) defer buf.Reset() diff --git a/daemon/logger/syslog/syslog.go b/daemon/logger/syslog/syslog.go index 6dea715e6..b25b18333 100644 --- a/daemon/logger/syslog/syslog.go +++ b/daemon/logger/syslog/syslog.go @@ -37,6 +37,11 @@ func defaultOptions() *options { } } +// Init return the Syslog log driver. +func Init(info logger.Info) (logger.LogDriver, error) { + return NewSyslog(info) +} + // NewSyslog returns new Syslog based on the log config. func NewSyslog(info logger.Info) (*Syslog, error) { opt, err := parseOptions(info) @@ -51,6 +56,11 @@ func NewSyslog(info logger.Info) (*Syslog, error) { }, nil } +// Name return the log driver's name. +func (s *Syslog) Name() string { + return "syslog" +} + // WriteLogMessage will write the LogMessage. func (s *Syslog) WriteLogMessage(msg *logger.LogMessage) error { line := string(msg.Line) diff --git a/daemon/logger/syslog/syslog_test.go b/daemon/logger/syslog/syslog_test.go index be9226bd8..311ab53bd 100644 --- a/daemon/logger/syslog/syslog_test.go +++ b/daemon/logger/syslog/syslog_test.go @@ -20,6 +20,8 @@ import ( "github.com/RackSec/srslog" ) +var _ logger.LogDriver = &Syslog{} + type testingTB interface { Fatalf(format string, args ...interface{}) } diff --git a/daemon/logger/types.go b/daemon/logger/types.go new file mode 100644 index 000000000..8e1be2dc3 --- /dev/null +++ b/daemon/logger/types.go @@ -0,0 +1,10 @@ +package logger + +// LogDriver represents any kind of log drivers, such as jsonfile, syslog. +type LogDriver interface { + Name() string + + WriteLogMessage(msg *LogMessage) error + + Close() error +} diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 01c42dbcc..b4eda4a6c 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -27,6 +27,7 @@ import ( "github.com/alibaba/pouch/pkg/errtypes" "github.com/alibaba/pouch/pkg/meta" mountutils "github.com/alibaba/pouch/pkg/mount" + "github.com/alibaba/pouch/pkg/streams" "github.com/alibaba/pouch/pkg/utils" "github.com/alibaba/pouch/storage/quota" volumetypes "github.com/alibaba/pouch/storage/volume/types" @@ -34,7 +35,6 @@ import ( "github.com/containerd/cgroups" containerdtypes "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/mount" - "github.com/docker/docker/pkg/stdcopy" "github.com/docker/libnetwork" "github.com/go-openapi/strfmt" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -83,8 +83,11 @@ type ContainerMgr interface { // Stats of a container. Stats(ctx context.Context, name string) (*containerdtypes.Metric, *cgroups.Metrics, error) - // Attach a container. - Attach(ctx context.Context, name string, attach *AttachConfig) error + // AttachContainerIO attach stream to container IO. + AttachContainerIO(ctx context.Context, name string, cfg *streams.AttachConfig) error + + // AttachCRILog attach cri log to container IO. + AttachCRILog(ctx context.Context, name string, path string) error // Rename renames a container. Rename(ctx context.Context, oldName string, newName string) error @@ -113,7 +116,7 @@ type ContainerMgr interface { CreateExec(ctx context.Context, name string, config *types.ExecCreateConfig) (string, error) // StartExec executes a new process in container. - StartExec(ctx context.Context, execid string, attach *AttachConfig) error + StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig) error // InspectExec returns low-level information about exec command. InspectExec(ctx context.Context, execid string) (*types.ContainerExecInspect, error) @@ -241,12 +244,18 @@ func (mgr *ContainerManager) Restore(ctx context.Context) error { } // recover the running or paused container. - io, err := mgr.openContainerIO(container) + cntrio, err := mgr.initContainerIO(container) if err != nil { - logrus.Errorf("failed to recover container %s: %v", id, err) + logrus.Errorf("failed to init container IO %s: %v", id, err) + return err } - err = mgr.Client.RecoverContainer(ctx, id, io) + if err := mgr.initLogDriverBeforeStart(container); err != nil { + logrus.Errorf("failed to init log driver %s: %v", id, err) + return err + } + + err = mgr.Client.RecoverContainer(ctx, id, cntrio) if err != nil && strings.Contains(err.Error(), "not found") { logrus.Infof("failed to recover container %s (not found, executes mark stopped and release resources): %v", id, err) if err := mgr.markStoppedAndRelease(container, nil); err != nil { @@ -255,7 +264,7 @@ func (mgr *ContainerManager) Restore(ctx context.Context) error { } else if err != nil { logrus.Errorf("failed to recover container %s: %v", id, err) // release io - io.Close() + cntrio.Close() mgr.IOs.Remove(id) } @@ -375,6 +384,11 @@ func (mgr *ContainerManager) Create(ctx context.Context, name string, config *ty SnapshotID: snapID, } + if _, err := mgr.initContainerIO(container); err != nil { + logrus.Errorf("failed to initialise IO: %v", err) + return nil, err + } + // merge image's config into container if err := container.merge(func() (ocispec.ImageConfig, error) { return mgr.ImageMgr.GetOCIImageConfig(ctx, config.Image) @@ -661,10 +675,9 @@ func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *C return err } - // open container's stdio. - io, err := mgr.openContainerIO(c) - if err != nil { - return errors.Wrap(err, "failed to open io") + // init log driver + if err := mgr.initLogDriverBeforeStart(c); err != nil { + return errors.Wrap(err, "failed to initialize log driver") } // set container's LogPath @@ -682,7 +695,7 @@ func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *C Labels: c.Config.Labels, Runtime: runtime, Spec: sw.s, - IO: io, + IO: mgr.IOs.Get(c.ID), RootFSProvided: c.RootFSProvided, BaseFS: c.BaseFS, SnapshotID: c.SnapshotID, @@ -892,19 +905,44 @@ func (mgr *ContainerManager) Unpause(ctx context.Context, name string) error { return nil } -// Attach attachs a container's io. -func (mgr *ContainerManager) Attach(ctx context.Context, name string, attach *AttachConfig) error { +// AttachContainerIO attachs a container's io. +func (mgr *ContainerManager) AttachContainerIO(ctx context.Context, name string, cfg *streams.AttachConfig) error { c, err := mgr.container(name) if err != nil { return err } - _, err = mgr.openAttachIO(c, attach) + cntrio := mgr.IOs.Get(c.ID) + cfg.Terminal = c.Config.Tty + + // NOTE: the AttachContainerIO might use the hijack's connection as + // stdin in the AttachConfig. If we close it directly, the stdout/stderr + // will return the `using closed connection` error. As a result, the + // Attach will return the error. We need to use pipe here instead of + // origin one and let the caller closes the stdin by themself. + if c.Config.OpenStdin && cfg.UseStdin { + oldStdin := cfg.Stdin + pstdinr, pstdinw := io.Pipe() + go func() { + defer pstdinw.Close() + io.Copy(pstdinw, oldStdin) + }() + cfg.Stdin = pstdinr + cfg.CloseStdin = true + } else { + cfg.UseStdin = false + } + return <-cntrio.Stream().Attach(ctx, cfg) +} + +// AttachCRILog adds cri log to a container. +func (mgr *ContainerManager) AttachCRILog(ctx context.Context, name string, logPath string) error { + c, err := mgr.container(name) if err != nil { return err } - return nil + return mgr.attachCRILog(c, logPath) } // Rename renames a container. @@ -1100,11 +1138,14 @@ func (mgr *ContainerManager) Remove(ctx context.Context, name string, options *t // When removing a container, we have set up such rule for object removing sequences: // 1. container object in pouchd's memory; // 2. meta.json for container in local disk. + // 3. remove the container IO from cache // remove name mgr.NameToID.Remove(c.Name) // remove container cache mgr.cache.Remove(c.ID) + // remove the container IO + mgr.IOs.Remove(c.ID) // remove meta.json for container in local disk if err := mgr.Store.Remove(c.Key()); err != nil { @@ -1417,25 +1458,6 @@ func (mgr *ContainerManager) Disconnect(ctx context.Context, containerName, netw return nil } -func (mgr *ContainerManager) openContainerIO(c *Container) (*containerio.IO, error) { - if io := mgr.IOs.Get(c.ID); io != nil { - return io, nil - } - - logInfo := mgr.convContainerToLoggerInfo(c) - options := []func(*containerio.Option){ - containerio.WithID(c.ID), - containerio.WithLoggerInfo(logInfo), - containerio.WithStdin(c.Config.OpenStdin), - } - - options = append(options, logOptionsForContainerio(c)...) - - io := containerio.NewIO(containerio.NewOption(options...)) - mgr.IOs.Put(c.ID, io) - return io, nil -} - func (mgr *ContainerManager) updateNetworkConfig(container *Container, networkIDOrName string, endpointConfig *types.EndpointSettings) error { if IsContainer(container.HostConfig.NetworkMode) { return fmt.Errorf("container sharing network namespace with another container or host cannot be connected to any other network") @@ -1537,77 +1559,54 @@ func (mgr *ContainerManager) updateNetworkSettings(container *Container, n libne return nil } -func (mgr *ContainerManager) openExecIO(id string, attach *AttachConfig) (*containerio.IO, error) { - if io := mgr.IOs.Get(id); io != nil { - return io, nil - } - - options := []func(*containerio.Option){ - containerio.WithID(id), - } - - if attach != nil { - options = append(options, attachConfigToOptions(attach)...) - options = append(options, containerio.WithStdin(attach.Stdin)) - options = append(options, containerio.WithMuxDisabled(attach.MuxDisabled)) - } else { - options = append(options, containerio.WithDiscard()) +func (mgr *ContainerManager) initContainerIO(c *Container) (*containerio.IO, error) { + if io := mgr.IOs.Get(c.ID); io != nil { + return nil, errors.Wrap(errtypes.ErrConflict, "failed to create containerIO") } - io := containerio.NewIO(containerio.NewOption(options...)) - mgr.IOs.Put(id, io) - return io, nil + cntrio := containerio.NewIO(c.ID, c.Config.OpenStdin) + mgr.IOs.Put(c.ID, cntrio) + return cntrio, nil } -func (mgr *ContainerManager) openAttachIO(c *Container, attach *AttachConfig) (*containerio.IO, error) { - logInfo := mgr.convContainerToLoggerInfo(c) - options := []func(*containerio.Option){ - containerio.WithID(c.ID), - containerio.WithLoggerInfo(logInfo), - } - options = append(options, logOptionsForContainerio(c)...) +func (mgr *ContainerManager) initLogDriverBeforeStart(c *Container) error { + var ( + cntrio *containerio.IO + err error + ) - if attach != nil { - options = append(options, attachConfigToOptions(attach)...) - options = append(options, containerio.WithStdin(attach.Stdin)) - } else { - options = append(options, containerio.WithDiscard()) + if cntrio = mgr.IOs.Get(c.ID); cntrio == nil { + cntrio, err = mgr.initContainerIO(c) + if err != nil { + return err + } } - io := mgr.IOs.Get(c.ID) - if io != nil { - io.AddBackend(containerio.NewOption(options...)) - } else { - io = containerio.NewIO(containerio.NewOption(options...)) + logDriver, err := logOptionsForContainerio(c, mgr.convContainerToLoggerInfo(c)) + if err != nil { + return err } - mgr.IOs.Put(c.ID, io) - return io, nil + cntrio.SetLogDriver(logDriver) + return nil } -func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) { - options := []func(*containerio.Option){} - if attach.Hijack != nil { - // Attaching using http. - options = append(options, containerio.WithHijack(attach.Hijack, attach.Upgrade)) - if attach.Stdin { - options = append(options, containerio.WithStdinHijack()) - } - } else if attach.Pipe != nil { - // Attaching using pipe. - options = append(options, containerio.WithPipe(attach.Pipe)) - } else if attach.Streams != nil { - // Attaching using streams. - options = append(options, containerio.WithStreams(attach.Streams)) - if attach.Stdin { - options = append(options, containerio.WithStdinStream()) - } +func (mgr *ContainerManager) attachCRILog(c *Container, logPath string) error { + cntrio := mgr.IOs.Get(c.ID) + if cntrio == nil { + return errors.Wrap(errtypes.ErrNotfound, "failed to get containerIO") } - if attach.CriLogFile != nil { - options = append(options, containerio.WithCriLogFile(attach.CriLogFile)) + return cntrio.AttachCRILog(logPath, c.Config.Tty) +} + +func (mgr *ContainerManager) initExecIO(id string, withStdin bool) (*containerio.IO, error) { + if io := mgr.IOs.Get(id); io != nil { + return nil, errors.Wrap(errtypes.ErrConflict, "failed to create containerIO") } - return options + cntrio := containerio.NewIO(id, withStdin) + mgr.IOs.Put(id, cntrio) + return cntrio, nil } func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message) error { @@ -1739,6 +1738,7 @@ func (mgr *ContainerManager) execExitedAndRelease(id string, m *ctrd.Message) er if !ok { return fmt.Errorf("invalid exec config type") } + execConfig.ExitCode = int64(m.ExitCode()) execConfig.Running = false execConfig.Error = m.RawError() @@ -1748,23 +1748,14 @@ func (mgr *ContainerManager) execExitedAndRelease(id string, m *ctrd.Message) er return nil } - if err := m.RawError(); err != nil { - var stdout io.Writer = eio.Stdout - if !execConfig.Tty && !eio.MuxDisabled { - stdout = stdcopy.NewStdWriter(stdout, stdcopy.Stdout) - } - stdout.Write([]byte(err.Error() + "\r\n")) - } - // close io eio.Close() mgr.IOs.Remove(id) - return nil } func (mgr *ContainerManager) releaseContainerResources(c *Container) error { - mgr.releaseContainerIOs(c.ID) + mgr.resetContainerIOs(c.ID) return mgr.releaseContainerNetwork(c) } @@ -1801,16 +1792,15 @@ func (mgr *ContainerManager) releaseContainerNetwork(c *Container) error { return nil } -// releaseContainerIOs releases container IO resources. -func (mgr *ContainerManager) releaseContainerIOs(containerID string) { +// resetContainerIOs resets container IO resources. +func (mgr *ContainerManager) resetContainerIOs(containerID string) { // release resource io := mgr.IOs.Get(containerID) if io == nil { return } - io.Close() - mgr.IOs.Remove(containerID) + io.Reset() return } diff --git a/daemon/mgr/container_exec.go b/daemon/mgr/container_exec.go index 3f18785c3..6a6ff6058 100644 --- a/daemon/mgr/container_exec.go +++ b/daemon/mgr/container_exec.go @@ -9,9 +9,9 @@ import ( "github.com/alibaba/pouch/ctrd" "github.com/alibaba/pouch/pkg/errtypes" "github.com/alibaba/pouch/pkg/randomid" + "github.com/alibaba/pouch/pkg/streams" "github.com/alibaba/pouch/pkg/user" - "github.com/docker/docker/pkg/stdcopy" "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) @@ -57,41 +57,13 @@ func (mgr *ContainerManager) ResizeExec(ctx context.Context, execid string, opts } // StartExec executes a new process in container. -func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, attach *AttachConfig) (err error) { +func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig) (err0 error) { // GetExecConfig should not error, since we have done this before call StartExec execConfig, err := mgr.GetExecConfig(ctx, execid) if err != nil { return err } - // FIXME(fuweid): make attachConfig consistent with execConfig - if attach != nil { - attach.Stdin = execConfig.AttachStdin - } - - eio, err := mgr.openExecIO(execid, attach) - if err != nil { - return err - } - - defer func() { - if err != nil { - var stdout io.Writer = eio.Stdout - if !execConfig.Tty && !eio.MuxDisabled { - stdout = stdcopy.NewStdWriter(stdout, stdcopy.Stdout) - } - stdout.Write([]byte(err.Error() + "\r\n")) - // set exec exit status - execConfig.Running = false - exitCode := 126 - execConfig.ExitCode = int64(exitCode) - - // close io to make hijack connection exit - eio.Close() - mgr.IOs.Remove(execid) - } - }() - c, err := mgr.container(execConfig.ContainerID) if err != nil { return err @@ -130,16 +102,53 @@ func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, attac return err } - execConfig.Running = true + // NOTE: the StartExec might use the hijack's connection as + // stdin in the AttachConfig. If we close it directly, the stdout/stderr + // will return the `using closed connection` error. As a result, the + // Attach will return the error. We need to use pipe here instead of + // origin one and let the caller closes the stdin by themself. + if execConfig.AttachStdin && cfg.UseStdin { + oldStdin := cfg.Stdin + pstdinr, pstdinw := io.Pipe() + go func() { + defer pstdinw.Close() + io.Copy(pstdinw, oldStdin) + }() + cfg.Stdin = pstdinr + } else { + cfg.UseStdin = false + } + + // NOTE: always close stdin pipe for exec process + cfg.CloseStdin = true + eio, err := mgr.initExecIO(execid, cfg.UseStdin) + if err != nil { + return err + } + + attachErrCh := eio.Stream().Attach(ctx, cfg) + + defer func() { + if err0 != nil { + // set exec exit status + execConfig.Running = false + exitCode := 126 + execConfig.ExitCode = int64(exitCode) + eio.Close() + mgr.IOs.Remove(execid) + } + }() - err = mgr.Client.ExecContainer(ctx, &ctrd.Process{ + execConfig.Running = true + if err := mgr.Client.ExecContainer(ctx, &ctrd.Process{ ContainerID: execConfig.ContainerID, ExecID: execid, IO: eio, P: process, - }) - - return err + }); err != nil { + return err + } + return <-attachErrCh } // InspectExec returns low-level information about exec command. diff --git a/daemon/mgr/container_logger.go b/daemon/mgr/container_logger.go index 8e9c399df..22c3debc8 100644 --- a/daemon/mgr/container_logger.go +++ b/daemon/mgr/container_logger.go @@ -4,29 +4,28 @@ import ( "path/filepath" "github.com/alibaba/pouch/apis/types" - "github.com/alibaba/pouch/daemon/containerio" "github.com/alibaba/pouch/daemon/logger" + "github.com/alibaba/pouch/daemon/logger/jsonfile" + "github.com/alibaba/pouch/daemon/logger/syslog" "github.com/sirupsen/logrus" ) -func logOptionsForContainerio(c *Container) []func(*containerio.Option) { - optFuncs := make([]func(*containerio.Option), 0, 1) - +func logOptionsForContainerio(c *Container, info logger.Info) (logger.LogDriver, error) { cfg := c.HostConfig.LogConfig if cfg == nil || cfg.LogDriver == types.LogConfigLogDriverNone { - return optFuncs + return nil, nil } switch cfg.LogDriver { case types.LogConfigLogDriverJSONFile: - optFuncs = append(optFuncs, containerio.WithJSONFile()) + return jsonfile.Init(info) case types.LogConfigLogDriverSyslog: - optFuncs = append(optFuncs, containerio.WithSyslog()) + return syslog.Init(info) default: logrus.Warnf("not support (%v) log driver yet", cfg.LogDriver) + return nil, nil } - return optFuncs } // convContainerToLoggerInfo uses logger.Info to wrap container information. diff --git a/pkg/ioutils/writer.go b/pkg/ioutils/writer.go new file mode 100644 index 000000000..bf19af94a --- /dev/null +++ b/pkg/ioutils/writer.go @@ -0,0 +1,43 @@ +package ioutils + +import "io" + +// noopWriter is an io.Writer on which all Write calls succeed without +// doing anything. +type noopWriter struct{} + +func (nw *noopWriter) Write(p []byte) (int, error) { + return len(p), nil +} + +func (nw *noopWriter) Close() error { + return nil +} + +// NewNoopWriteCloser returns the no-op WriteCloser. +func NewNoopWriteCloser() io.WriteCloser { + return &noopWriter{} +} + +type writeCloserWrapper struct { + io.Writer + closeFunc func() error +} + +func (w *writeCloserWrapper) Close() error { + return w.closeFunc() +} + +// NewWriteCloserWrapper provides the ability to handle the cleanup during closer. +func NewWriteCloserWrapper(w io.Writer, closeFunc func() error) io.WriteCloser { + return &writeCloserWrapper{ + Writer: w, + closeFunc: closeFunc, + } +} + +// CloseWriter is an interface which represents the implementation closes the +// writing side of writer. +type CloseWriter interface { + CloseWrite() error +} diff --git a/pkg/streams/multi.go b/pkg/streams/multi.go new file mode 100644 index 000000000..18d2a1652 --- /dev/null +++ b/pkg/streams/multi.go @@ -0,0 +1,52 @@ +package streams + +import ( + "io" + "sync" + + "github.com/sirupsen/logrus" +) + +// multiWriter allows caller to broadcast data to several writers. +type multiWriter struct { + sync.Mutex + writers []io.WriteCloser +} + +// Add registers one writer into MultiWriter. +func (mw *multiWriter) Add(writer io.WriteCloser) { + mw.Lock() + mw.writers = append(mw.writers, writer) + mw.Unlock() +} + +// Write writes data into several writers and never returns error. +func (mw *multiWriter) Write(p []byte) (int, error) { + mw.Lock() + var evictIdx []int + for n, w := range mw.writers { + if _, err := w.Write(p); err != nil { + logrus.WithError(err).Debug("failed to write data") + + w.Close() + evictIdx = append(evictIdx, n) + } + } + + for n, i := range evictIdx { + mw.writers = append(mw.writers[:i-n], mw.writers[i-n+1:]...) + } + mw.Unlock() + return len(p), nil +} + +// Close closes all the writers and never returns error. +func (mw *multiWriter) Close() error { + mw.Lock() + for _, w := range mw.writers { + w.Close() + } + mw.writers = nil + mw.Unlock() + return nil +} diff --git a/pkg/streams/multi_test.go b/pkg/streams/multi_test.go new file mode 100644 index 000000000..cd8b47d6a --- /dev/null +++ b/pkg/streams/multi_test.go @@ -0,0 +1,66 @@ +package streams + +import ( + "bytes" + "fmt" + "reflect" + "testing" +) + +type badWriter struct{} + +func (w *badWriter) Write(_ []byte) (int, error) { + return 0, fmt.Errorf("oops") +} + +func (w *badWriter) Close() error { + return nil +} + +type bufferWrapper struct { + *bytes.Buffer +} + +func (w *bufferWrapper) Close() error { + return nil +} + +func TestMultiWriter(t *testing.T) { + mw := new(multiWriter) + + var ( + w1 = &badWriter{} + w2 = &bufferWrapper{bytes.NewBuffer(nil)} + w3 = &badWriter{} + ) + + mw.Add(w1) + mw.Add(w2) + mw.Add(w3) + + // step1: write hello + n, err := mw.Write([]byte("hello")) + if n != 5 || err != nil { + t.Fatalf("failed to write data: n=%v, err=%v", n, err) + } + + if len(mw.writers) != 1 || !reflect.DeepEqual(mw.writers[0], w2) { + t.Fatal("failed to evict the bad writer") + } + + // step2: write pouch + n, err = mw.Write([]byte("pouch")) + if n != 5 || err != nil { + t.Fatalf("failed to write data: n=%v, err=%v", n, err) + } + + if w2.String() != "hellopouch" { + t.Fatalf("failed to write data") + } + + // step3: close + mw.Close() + if len(mw.writers) != 0 { + t.Fatal("failed to remove all the writers after close") + } +} diff --git a/pkg/streams/stream.go b/pkg/streams/stream.go new file mode 100644 index 000000000..cb2c2dc06 --- /dev/null +++ b/pkg/streams/stream.go @@ -0,0 +1,103 @@ +package streams + +import ( + "io" + "sync" + + "github.com/alibaba/pouch/pkg/ioutils" + "github.com/alibaba/pouch/pkg/multierror" +) + +// NewStream returns new streams. +func NewStream() *Stream { + return &Stream{ + stdout: &multiWriter{}, + stderr: &multiWriter{}, + } +} + +// Stream is used to handle container IO. +type Stream struct { + sync.WaitGroup + stdin io.ReadCloser + stdinPipe io.WriteCloser + stdout, stderr *multiWriter +} + +// Stdin returns the Stdin for reader. +func (s *Stream) Stdin() io.ReadCloser { + return s.stdin +} + +// StdinPipe returns the Stdin for writer. +func (s *Stream) StdinPipe() io.WriteCloser { + return s.stdinPipe +} + +// NewStdinInput creates pipe for Stdin() and StdinPipe(). +func (s *Stream) NewStdinInput() { + s.stdin, s.stdinPipe = io.Pipe() +} + +// NewDiscardStdinInput creates a no-op WriteCloser for StdinPipe(). +func (s *Stream) NewDiscardStdinInput() { + s.stdin, s.stdinPipe = nil, ioutils.NewNoopWriteCloser() +} + +// Stdout returns the Stdout for writer. +func (s *Stream) Stdout() io.WriteCloser { + return s.stdout +} + +// Stderr returns the Stderr for writer. +func (s *Stream) Stderr() io.WriteCloser { + return s.stderr +} + +// AddStdoutWriter adds the stdout writer. +func (s *Stream) AddStdoutWriter(w io.WriteCloser) { + s.stdout.Add(w) +} + +// AddStderrWriter adds the stderr writer. +func (s *Stream) AddStderrWriter(w io.WriteCloser) { + s.stderr.Add(w) +} + +// NewStdoutPipe creates pipe and register it into Stdout. +func (s *Stream) NewStdoutPipe() io.ReadCloser { + r, w := io.Pipe() + s.stdout.Add(w) + return r +} + +// NewStderrPipe creates pipe and register it into Stderr. +func (s *Stream) NewStderrPipe() io.ReadCloser { + r, w := io.Pipe() + s.stderr.Add(w) + return r +} + +// Close closes streams. +func (s *Stream) Close() error { + multiErrs := new(multierror.Multierrors) + + if s.stdin != nil { + if err := s.stdin.Close(); err != nil { + multiErrs.Append(err) + } + } + + if err := s.stdout.Close(); err != nil { + multiErrs.Append(err) + } + + if err := s.stderr.Close(); err != nil { + multiErrs.Append(err) + } + + if multiErrs.Size() > 0 { + return multiErrs + } + return nil +} diff --git a/pkg/streams/utils.go b/pkg/streams/utils.go new file mode 100644 index 000000000..29f31747b --- /dev/null +++ b/pkg/streams/utils.go @@ -0,0 +1,185 @@ +package streams + +import ( + "context" + "io" + + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +// Pipes is used to present any downstream pipe, for example, containerd's cio. +type Pipes struct { + Stdin io.WriteCloser + + Stdout io.ReadCloser + + Stderr io.ReadCloser +} + +// AttachConfig is used to describe how to attach the client's stream to +// the process's stream. +type AttachConfig struct { + Terminal bool + + // CloseStdin means if the stdin of client's stream is closed by the + // caller, the stdin of process's stream should be closed. + CloseStdin bool + + // UseStdin/UseStdout/UseStderr can be used to check the client's stream + // is nil or not. It is hard to check io.Write/io.ReadCloser != nil + // directly, because they might be specific type, which means + // (typ != nil) always is true. + UseStdin, UseStdout, UseStderr bool + + Stdin io.ReadCloser + Stdout, Stderr io.Writer +} + +// CopyPipes will watchs the data pipe's channel, like sticked to the pipe. +// +// NOTE: don't assign the specific type to the Pipes because the Std* != nil +// always return true. +func (s *Stream) CopyPipes(p Pipes) { + copyfn := func(styp string, w io.WriteCloser, r io.ReadCloser) { + s.Add(1) + go func() { + logrus.Debugf("start to copy %s from pipe", styp) + defer logrus.Debugf("stop copy %s from pipe", styp) + + defer s.Done() + defer r.Close() + + if _, err := io.Copy(w, r); err != nil { + logrus.WithError(err).Error("failed to copy pipe data") + } + }() + } + + if p.Stdout != nil { + copyfn("stdout", s.Stdout(), p.Stdout) + } + + if p.Stderr != nil { + copyfn("stderr", s.Stderr(), p.Stderr) + } + + if s.stdin != nil && p.Stdin != nil { + go func() { + logrus.Debug("start to copy stdin from pipe") + defer logrus.Debug("stop copy stdin from pipe") + + io.Copy(p.Stdin, s.stdin) + if err := p.Stdin.Close(); err != nil { + logrus.WithError(err).Error("failed to close pipe stdin") + } + }() + } +} + +// Attach will use stream defined by AttachConfig to attach the Stream. +func (s *Stream) Attach(ctx context.Context, cfg *AttachConfig) <-chan error { + var ( + group errgroup.Group + stdout, stderr io.ReadCloser + ) + + if cfg.UseStdin { + group.Go(func() error { + logrus.Debug("start to attach stdin to stream") + defer logrus.Debug("stop attach stdin to stream") + + defer func() { + if cfg.CloseStdin { + s.StdinPipe().Close() + } + }() + + _, err := io.Copy(s.StdinPipe(), cfg.Stdin) + if err == io.ErrClosedPipe { + err = nil + } + return err + }) + } + + attachFn := func(styp string, w io.Writer, r io.ReadCloser) error { + logrus.Debugf("start to attach %s to stream", styp) + defer logrus.Debugf("stop attach %s to stream", styp) + + defer func() { + // NOTE: when the stdout/stderr is closed, the stdin + // should be closed. for example, caller types the exit + // command, the stdout will be closed. in this case, + // the stdin should be closed. Otherwise, the caller + // will wait for close signal forever. + if cfg.UseStdin { + cfg.Stdin.Close() + } + r.Close() + }() + + _, err := io.Copy(w, r) + if err == io.ErrClosedPipe { + err = nil + } + return err + } + + if cfg.UseStdout { + stdout = s.NewStdoutPipe() + group.Go(func() error { + return attachFn("stdout", cfg.Stdout, stdout) + }) + } + + if cfg.UseStderr { + stderr = s.NewStderrPipe() + group.Go(func() error { + return attachFn("stderr", cfg.Stderr, stderr) + }) + } + + var ( + errCh = make(chan error, 1) + groupErrCh = make(chan error, 1) + ) + + go func() { + defer close(groupErrCh) + groupErrCh <- group.Wait() + }() + + go func() { + defer logrus.Debug("the goroutine for attaching is done") + defer close(errCh) + + select { + case <-ctx.Done(): + if cfg.UseStdin { + cfg.Stdin.Close() + } + + // NOTE: the stdout writer will be evicted from stream in + // next Write call. + if cfg.UseStdout { + stdout.Close() + } + + // NOTE: the stderr writer will be evicted from stream in + // next Write call. + if cfg.UseStderr { + stderr.Close() + } + + if err := group.Wait(); err != nil { + errCh <- err + return + } + errCh <- ctx.Err() + case err := <-groupErrCh: + errCh <- err + } + }() + return errCh +} diff --git a/pkg/streams/utils_test.go b/pkg/streams/utils_test.go new file mode 100644 index 000000000..f811ed605 --- /dev/null +++ b/pkg/streams/utils_test.go @@ -0,0 +1,75 @@ +package streams + +import ( + "bytes" + "context" + "io" + "testing" +) + +func TestAttachWithCloseStdin(t *testing.T) { + var ( + aStdin = &bufferWrapper{bytes.NewBuffer(nil)} + aStdout = bytes.NewBuffer(nil) + aStderr = bytes.NewBuffer(nil) + ) + + attachCfg := &AttachConfig{ + UseStdin: true, + Stdin: aStdin, + UseStdout: true, + Stdout: aStdout, + UseStderr: false, + Stderr: aStderr, + CloseStdin: true, // must set it to true + } + + stream := NewStream() + stream.NewStdinInput() + + // write data into the aStdin + content := "" + for i := 0; i < 100; i++ { + d := "hello" + if _, err := aStdin.Write([]byte(d)); err != nil { + t.Fatalf("failed to write data: %v", err) + } + content += d + } + + // start attach stream + attachErr := stream.Attach(context.Background(), attachCfg) + + // write data into stderr, but the data never goes into the aStderr + stream.Stderr().Write([]byte("hello stderr")) + + // read from stdin and echo it into stdout + echoRout, echoW := io.Pipe() + go func() { + io.Copy(echoW, stream.Stdin()) + echoW.Close() + }() + + go func() { + io.Copy(stream.Stdout(), echoRout) + stream.Stdout().Close() + }() + + aStdin.Close() + if err := <-attachErr; err != nil { + t.Fatalf("failed to attach: %v", err) + } + + if got := aStdout.String(); got != content { + t.Fatalf("expected to get (%s), but got (%s)", content, got) + } + + // UseStderr is false + if aStderr.String() != "" { + t.Fatalf("should not get any data in stderr, but got %v", aStderr.String()) + } + + if err := stream.Close(); err != nil { + t.Fatalf("failed to stop stream: %v", err) + } +} diff --git a/test/cli_exec_test.go b/test/cli_exec_test.go index f7472c305..36d7596b8 100644 --- a/test/cli_exec_test.go +++ b/test/cli_exec_test.go @@ -2,8 +2,11 @@ package main import ( "bufio" + "context" + "fmt" "os/exec" "strings" + "time" "github.com/alibaba/pouch/test/command" "github.com/alibaba/pouch/test/environment" @@ -279,3 +282,19 @@ func (suite *PouchExecSuite) TestExecWithTty(c *check.C) { errString := attachRes.Stderr() assert.Equal(c, errString, "Error: the input device is not a TTY\n") } + +// TestExecForCloseIO test CloseIO works. +func (suite *PouchExecSuite) TestExecForCloseIO(c *check.C) { + name := "TestExecForCloseIO" + defer DelContainerForceMultyTime(c, name) + + command.PouchRun("run", "-d", "--name", name, busyboxImage, "top").Assert(c, icmd.Success) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + + cmdLine := fmt.Sprintf("echo 1 | %s exec -i %s sh -c 'cat && echo hello'", environment.PouchBinary, name) + out, err := exec.CommandContext(ctx, "bash", "-c", cmdLine).Output() + c.Assert(err, check.IsNil) + c.Assert(string(out), check.Equals, "1\nhello\n") +} diff --git a/test/cli_run_interactive_test.go b/test/cli_run_interactive_test.go new file mode 100644 index 000000000..0893fcfba --- /dev/null +++ b/test/cli_run_interactive_test.go @@ -0,0 +1,84 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os/exec" + "time" + + "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/test/command" + "github.com/alibaba/pouch/test/environment" + + "github.com/go-check/check" + "github.com/gotestyourself/gotestyourself/icmd" +) + +// PouchRunInteractiveSuite is the test suite for run CLI. +type PouchRunInteractiveSuite struct{} + +func init() { + check.Suite(&PouchRunInteractiveSuite{}) +} + +// SetUpSuite does common setup in the beginning of each test suite. +func (suite *PouchRunInteractiveSuite) SetUpSuite(c *check.C) { + SkipIfFalse(c, environment.IsLinux) + PullImage(c, busyboxImage) +} + +// TestRunInteractive test "-i" option works. +func (suite *PouchRunInteractiveSuite) TestRunInteractive(c *check.C) { + name := "TestRunInteractiveContainer" + defer DelContainerForceMultyTime(c, name) + + // use pipe to act interactive + stdinR, stdinW := io.Pipe() + stdoutBuf := bytes.NewBuffer(nil) + defer stdinR.Close() + + // need to start command before write + cmd := command.PouchCmd("run", + "-i", "--net", "none", + "--name", name, busyboxImage, "cat") + cmd.Stdin = stdinR + cmd.Stdout = stdoutBuf + res := icmd.StartCmd(cmd) + + // send bye to cat + content := "byte\n" + fmt.Fprintf(stdinW, content) + stdinW.Close() + + res = icmd.WaitOnCmd(100*time.Second, res) + res.Assert(c, icmd.Success) + + // NOTE: container must be exited. + { + output := command.PouchRun("inspect", name).Stdout() + result := []types.ContainerJSON{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + c.Errorf("failed to decode inspect output: %v", err) + } + c.Assert(string(result[0].State.Status), check.Equals, "exited") + } + + c.Assert(stdoutBuf.String(), check.Equals, content) +} + +// TestRunForCloseIO test CloseIO works. +func (suite *PouchRunInteractiveSuite) TestRunForCloseIO(c *check.C) { + name := "TestRunForCloseIO" + defer DelContainerForceMultyTime(c, name) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + + cmdLine := fmt.Sprintf("echo 1 | %s run -i %s sh -c 'cat && echo hello'", environment.PouchBinary, busyboxImage) + out, err := exec.CommandContext(ctx, "bash", "-c", cmdLine).Output() + c.Assert(err, check.IsNil) + c.Assert(string(out), check.Equals, "1\nhello\n") +}