Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] refactor: redesign container io in pouch #2375

Merged
merged 1 commit into from
Nov 5, 2018
Merged

[RFC] refactor: redesign container io in pouch #2375

merged 1 commit into from
Nov 5, 2018

Conversation

fuweid
Copy link
Contributor

@fuweid fuweid commented Oct 28, 2018

Signed-off-by: Wei Fu [email protected]

Ⅰ. Describe what this PR did

Redesign container io in pouch. but Why?

The original design always uses ringbuffer to cache the data which causes streams lost data sometime. It makes the regression test unstable.

It also makes caller hard to touch data from containerd-shim. That is why we still miss the ReopenLog in CRI. I think we should fix that.

design overview

pouchcontainer io

Each container/process will hold streams struct to handle the io stream. The struct is like:

// Stream is used to handle container IO.
type Stream struct {
        sync.WaitGroup
        stdin          io.ReadCloser
        stdinPipe      io.WriteCloser
        stdout, stderr *multiWriter
}

In the stream struct, both the stdout and stderr are the multiple writer's hub. It means that the data from stdout/stderr of process will have more copies. For example, when user use run to start container, the container's output data will have two copies. One is for the terminal side, the other one is for container's log, such as json-file by default.

But for the stdin, we don't need the hub like stdout/stderr, because the input of process is only one. Any attach requests can send the data to the process in container. So we exports the stdinPipe to allow different attach requests to touch the same input channel.

Based on this, the stream acts like bridge to connect attach request and stream of process in container. If the user wants to attach running container, just adds hijack/stream into the output hub and attach it to the stdinPipe. The stream can provides more flexible than before.

Before this change, the CRI struggles to integrating with Container Mgr. Why? because CRI component doesn't know the stream has been finished. Always use dead loop to check the Exec status. It's not good patten.

why add logdriver/crilog into the writer hub?

pouchcontainer io 1

Both logdriver and crilog don't act like io.Writer. It always add meta data for the raw data. Adding the partial label is one of examples for the acting way of logdriver. In order to avoid to add duplicate code for each driver, we use LogCopier to handle this.

There is a little bit different between crilog and logdriver. The crilog has ReopenLog request. It means that it will add new crilog and close previous one.

In current design, we might have duplicate data in the crilog. Because we close previous, the writer-hub might write something into that. I think it's ok right now.

What is the CloseIO and StdinOnce?

The contained-shim will open fifo twice for reading and writing. For the writing mode, the shim doesn't close stdin fifo until the client calls CloseIO. In some case, the pouch daemon might be crash before finishing the input. If shim doesn't hold writing mode fifo, the process in container will consider that it is EOF signal and exit.

Based on this case, if the client sends EOF signal to input channel, the pouchd should send the CloseIO to shim to let the process exit.

StdinOnce in container's config is used by attach request. If the StdinOnce is true, when one attach request finishes stream copy, the pouchd will closes the input of process. So other attach requests to the same container will be stopped.

If the user wants StdinOnce, it should set it to true during creating container.

NOTE: doc link is here.

Ⅱ. Does this pull request fix one issue?

NONE

Ⅲ. Why don't you add test cases (unit test/integration test)? (你真的觉得不需要加测试吗?)

Added it.

Ⅳ. Describe how to verify it

Wait for CI.

Ⅴ. Special notes for reviews

I remove the non-blocking feature in this change and will add it in next patch.

@codecov
Copy link

codecov bot commented Oct 28, 2018

Codecov Report

Merging #2375 into master will increase coverage by 0.26%.
The diff coverage is 76.55%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2375      +/-   ##
==========================================
+ Coverage   68.31%   68.58%   +0.26%     
==========================================
  Files         275      272       -3     
  Lines       18331    18206     -125     
==========================================
- Hits        12523    12486      -37     
+ Misses       4372     4302      -70     
+ Partials     1436     1418      -18
Flag Coverage Δ
#criv1alpha1test 31.39% <61.97%> (-0.25%) ⬇️
#criv1alpha2test 35.46% <66.28%> (-0.16%) ⬇️
#integrationtest 40.01% <59.56%> (+0.19%) ⬆️
#nodee2etest 32.87% <51.71%> (-0.15%) ⬇️
#unittest 26.44% <16.94%> (+0.98%) ⬆️
Impacted Files Coverage Δ
cri/stream/remotecommand/httpstream.go 46.63% <ø> (ø) ⬆️
cri/v1alpha2/cri_utils.go 91.21% <ø> (+0.69%) ⬆️
cri/stream/remotecommand/attach.go 64.7% <ø> (-1.97%) ⬇️
cri/v1alpha1/cri_utils.go 83.53% <ø> (+0.31%) ⬆️
daemon/logger/syslog/syslog.go 75.82% <100%> (+1.11%) ⬆️
pkg/streams/multi.go 100% <100%> (ø)
ctrd/watch.go 83.33% <50%> (+4.54%) ⬆️
daemon/mgr/container.go 59.22% <62.68%> (-0.85%) ⬇️
ctrd/container.go 59.28% <63.06%> (+0.95%) ⬆️
pkg/ioutils/writer.go 66.66% <66.66%> (ø)
... and 35 more

@fuweid fuweid changed the title [WIP] refactor: redesign container io in pouch [WIP|RFC] refactor: redesign container io in pouch Oct 28, 2018
@fuweid fuweid assigned starnop and unassigned starnop Oct 29, 2018
@fuweid fuweid requested a review from starnop October 29, 2018 07:10
@fuweid
Copy link
Contributor Author

fuweid commented Oct 29, 2018

ping @starnop and @YaoZengzeng to help me review the cri part. Thanks

cli/run.go Outdated
@@ -71,6 +72,7 @@ func (rc *RunCommand) runRun(args []string) error {
}
containerName := rc.name
config.ContainerConfig.OpenStdin = rc.stdin
config.ContainerConfig.StdinOnce = rc.stdin // close stdin after attach connection closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if pouch run with -i -d flag enable and when the terminal detach, this container can't be attached again, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

Copy link
Contributor

@ZYecho ZYecho Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a another flag for end-users to control is a good choice?

Copy link
Contributor Author

@fuweid fuweid Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has been blocked by

if (rc.attach || rc.stdin) && rc.detach { 
return fmt.Errorf("Conflicting options: -a (or -i) and -d")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a another flag for end-users to control is a good choice?

basically, I don't think so. The cli is just to help user to use pouch easier. This configuration is too complicated. I don't see any requirement for this.

"github.com/alibaba/pouch/daemon/logger"
"github.com/alibaba/pouch/daemon/logger/crilog"
"github.com/alibaba/pouch/pkg/streams"
"github.com/sirupsen/logrus"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add blank line before this line?

test/cli_run_interactive_test.go Show resolved Hide resolved
@ZYecho
Copy link
Contributor

ZYecho commented Oct 29, 2018

In this pr desc, the stdinPipe of the Stream struct was shared if there's more than one attacher? @fuweid I did't know the difference between stdinPipe and stdin in Stream struct.

@fuweid
Copy link
Contributor Author

fuweid commented Oct 29, 2018

In this pr desc, the stdinPipe of the Stream struct was shared if there's more than one attacher? @fuweid I did't know the difference between stdinPipe and stdin in Stream struct.

Yes. it can be shared by more than one attacher. I was struggled from naming this. stdin is reader of pipe and stdinPipe is writer of pipe.

cli/exec.go Show resolved Hide resolved
@starnop
Copy link
Contributor

starnop commented Oct 31, 2018

I have tested the function ReopenContainerLog. Congratulations! Everything is ok, after this pull request merged, I will complete it. And then, LGTM. :)

if !stdin {
fifos.In = ""
if !withTerminal {
cfg.Stderr = filepath.Join(fifoDir, processID+"-stderr")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think stderr fifo has related with terminal is open

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I checked that containerd-shim. The shim will open socket as pty-slave. There is only one output from pty-slave so that we don't need stderr fifo if the withTerminal is true

}
if g.dir != "" {
return os.RemoveAll(g.dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we remove fifo dir here, now it moved to closeFn, does it matter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Before we need the dir to cleanup. I move it the closeFn, like [email protected]. All the cio.go's changes are for upgrading containerd.

func (g *wgCloser) Cancel() {
g.cancel()
// TODO(fuweid): pipes will be removed when update vendor to [email protected].
type pipes struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use pipe, direct IO before ? I will do more check

}

func copyIO(fifos *containerdio.FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
func openPipes(ctx context.Context, fifos *CioFIFOSet) (_ pipes, err0 error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, you implement pipe for here use.

@rudyfly
Copy link
Collaborator

rudyfly commented Nov 1, 2018

@fuweid
Copy link
Contributor Author

fuweid commented Nov 2, 2018

any update @Ace-Tang @YaoZengzeng


// Both the caller and container/exec process holds write side pipe
// for the stdin. When the caller closes the write pipe, the process doesn't
// exit until the caller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we missed some comment here? 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Wait() to make sure pouchd receives all the data from shim. And Close() is used to make sure the Attach() should exit because there is no data. Thank @YaoZengzeng. In the cleanup, we should Wait() before Close(). I miss that. will update. 👍

@fuweid
Copy link
Contributor Author

fuweid commented Nov 4, 2018

@YaoZengzeng I have updated that the containerio does Close action which includes Wait action. However, just in case, we wait for the Wait in 10 seconds. If not, we will go ahead.

mw.writers = append(mw.writers[:i-n], mw.writers[i-n+1:]...)
}
mw.Unlock()
return len(p), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all of the writer writing bytes fails here, still return len(p). Is it reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we don't return error here, we need to return the len(p) here. The writer hub uses io.Pipe as io.WriteCloser. All data are handled in memory, not the real file or network IO. If it run into a error, there is some wrong with pouch and we should restart pouchd.

I think I can add the warning message here if it run into error during writing. How do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Print the warning message is enough.

Copy link
Contributor

@zhuangqh zhuangqh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about the lastErr pattern. You will lose some error messages. What about using multierror or print a log.

mw.writers = append(mw.writers[:i-n], mw.writers[i-n+1:]...)
}
mw.Unlock()
return len(p), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Print the warning message is enough.

func (s *Stream) CopyPipes(p Pipes) {
copyfn := func(styp string, w io.WriteCloser, r io.ReadCloser) {
s.Add(1)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controlled by waitGroup to promise all of the data has finished copying(abnormal exit is also regarded as finish). Just comment to record this.

}

if s.stdin != nil && p.Stdin != nil {
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not controlled by waitGroup. Because it may be suddenly closed by user. Just comment to record this.

}

attach.UseStdin = req.FormValue("stdin") == "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using httputils.BoolValue(req, "stdin") instead?

@allencloud allencloud changed the title [WIP|RFC] refactor: redesign container io in pouch [RFC] refactor: redesign container io in pouch Nov 5, 2018
@allencloud allencloud added the priority/P1 this is high priority that all maintainers should stop to handle this issue label Nov 5, 2018
@Ace-Tang
Copy link
Contributor

Ace-Tang commented Nov 5, 2018

LGTM, more test later.

@pouchrobot pouchrobot added the LGTM one maintainer or community participant agrees to merge the pull reuqest. label Nov 5, 2018
@fuweid
Copy link
Contributor Author

fuweid commented Nov 5, 2018

ping @zhuangqh I update with your recommendation. please take a look.

return fifos, nil
closeFn := func() error {
if err := os.RemoveAll(fifoDir); err != nil {
logrus.WithError(err).Warnf("failed to remove process(id=%v) fifo dir", processID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

// NewFifos returns a new set of fifos for the task
func NewFifos(id string, stdin bool) (*containerdio.FIFOSet, error) {
// NewCioFIFOSet prepares fifo files.
func NewCioFIFOSet(processID string, withStdin bool, withTerminal bool) (*CioFIFOSet, error) {
root := "/run/containerd/fifo"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can config for io root directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But I just keep it unchanged. If we have this requirement, we can file other PR to handle this.


err = c.ContainerMgr.StartExec(ctx, execid, attachConfig)
if err != nil {
if err := c.ContainerMgr.StartExec(ctx, execid, attachCfg); err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change fmt.Errorf to errors.Wrapf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just keep it unchanged and file other PR to handle this.

@@ -27,6 +26,7 @@ import (
"github.com/alibaba/pouch/pkg/errtypes"
"github.com/alibaba/pouch/pkg/meta"
"github.com/alibaba/pouch/pkg/reference"
pkgstreams "github.com/alibaba/pouch/pkg/streams"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't use steams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because cri has a package named by streams. Just use pkgstreams to distinguish two packages

Copy link
Collaborator

@rudyfly rudyfly Nov 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emmmmm, cri package is steam, your package is steams, he he he he

}, nil
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change all fmt.Errorf("failed to xxxx %q: %v", id, err) to errors.Wrapf()

@@ -791,3 +831,107 @@ func withCheckpointOpt(checkpoint *containerdtypes.Descriptor) containerd.NewTas
return nil
}
}

// InitStdio allows caller to handle any initialize job.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/job/jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think job is ok 😂

type streamType string

const (
// Stdout stream type.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/streamStdout/Stdout

const (
// Stdout stream type.
streamStdout streamType = "stdout"
// Stderr stream type.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/streamStderr/Stderr

options = append(options, containerio.WithStdin(attach.Stdin))
} else {
options = append(options, containerio.WithDiscard())
if cntrio = mgr.IOs.Get(c.ID); cntrio == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

containerIO's abbreviation is cntrio, is it ok? some is container -> ctr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

container -> cntr I think it's ok 😅

@YaoZengzeng
Copy link
Contributor

LGTM
@fuweid Your work is really brilliant 😎

@fuweid
Copy link
Contributor Author

fuweid commented Nov 5, 2018

ping @zhuangqh @rudyfly I changes cntrio -> ctrio in containerio package.

Copy link
Contributor

@zhuangqh zhuangqh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@allencloud allencloud merged commit b83956b into AliyunContainerService:master Nov 5, 2018
@fuweid fuweid deleted the refactor_stream_io branch November 5, 2018 08:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/design kind/refactor LGTM one maintainer or community participant agrees to merge the pull reuqest. priority/P1 this is high priority that all maintainers should stop to handle this issue size/XXL
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants