Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] refactor: redesign container io in pouch #2375

Merged
merged 1 commit into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions apis/server/container_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
"github.com/alibaba/pouch/pkg/streams"
"github.com/alibaba/pouch/pkg/utils"
"github.com/alibaba/pouch/pkg/utils/filters"

Expand Down Expand Up @@ -284,26 +286,40 @@ func (s *Server) renameContainer(ctx context.Context, rw http.ResponseWriter, re

func (s *Server) attachContainer(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
name := mux.Vars(req)["name"]

_, upgrade := req.Header["Upgrade"]

hijacker, ok := rw.(http.Hijacker)
if !ok {
return fmt.Errorf("not a hijack connection, container: %s", name)
}
var (
err error
closeFn func() error
attach = new(streams.AttachConfig)
stdin io.ReadCloser
stdout io.Writer
)

attach := &mgr.AttachConfig{
Hijack: hijacker,
Stdin: req.FormValue("stdin") == "1",
Stdout: true,
Stderr: true,
Upgrade: upgrade,
stdin, stdout, closeFn, err = openHijackConnection(rw)
if err != nil {
return err
}

if err := s.ContainerMgr.Attach(ctx, name, attach); err != nil {
// TODO handle error
// close hijack stream
defer closeFn()

if upgrade {
fmt.Fprintf(stdout, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
} else {
fmt.Fprintf(stdout, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
}

attach.UseStdin = httputils.BoolValue(req, "stdin")
attach.Stdin = stdin
attach.UseStdout = true
attach.Stdout = stdout
attach.UseStderr = true
attach.Stderr = stdout

if err := s.ContainerMgr.AttachContainerIO(ctx, name, attach); err != nil {
stdout.Write([]byte(err.Error() + "\r\n"))
}
return nil
}

Expand Down
65 changes: 52 additions & 13 deletions apis/server/exec_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
"github.com/alibaba/pouch/pkg/streams"

"github.com/docker/docker/pkg/stdcopy"
"github.com/go-openapi/strfmt"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -64,28 +66,47 @@ func (s *Server) startContainerExec(ctx context.Context, rw http.ResponseWriter,
return err
}

var attach *mgr.AttachConfig
var (
err error
closeFn func() error
attach = new(streams.AttachConfig)
stdin io.ReadCloser
stdout io.Writer
)

// TODO(huamin.thm): support detach exec process through http post method
if !config.Detach {
hijacker, ok := rw.(http.Hijacker)
if !ok {
return fmt.Errorf("not a hijack connection, container: %s", name)
stdin, stdout, closeFn, err = openHijackConnection(rw)
if err != nil {
return err
}

attach = &mgr.AttachConfig{
Hijack: hijacker,
Stdin: config.Tty,
Stdout: true,
Stderr: true,
Upgrade: upgrade,
// close hijack stream
defer closeFn()

if upgrade {
fmt.Fprintf(stdout, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
} else {
fmt.Fprintf(stdout, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
}

attach.UseStdin, attach.Stdin = true, stdin
attach.Terminal = config.Tty

if config.Tty {
attach.UseStdout, attach.Stdout = true, stdout
} else {
attach.UseStdout, attach.Stdout = true, stdcopy.NewStdWriter(stdout, stdcopy.Stdout)
attach.UseStderr, attach.Stderr = true, stdcopy.NewStdWriter(stdout, stdcopy.Stderr)
}
}

if err := s.ContainerMgr.StartExec(ctx, name, attach); err != nil {
logrus.Errorf("failed to run exec process: %s", err)
if config.Detach {
return err
}
attach.Stdout.Write([]byte(err.Error() + "\r\n"))
}

return nil
}

Expand Down Expand Up @@ -124,3 +145,21 @@ func (s *Server) resizeExec(ctx context.Context, rw http.ResponseWriter, req *ht
return nil

}

func openHijackConnection(rw http.ResponseWriter) (io.ReadCloser, io.Writer, func() error, error) {
hijacker, ok := rw.(http.Hijacker)
if !ok {
return nil, nil, nil, fmt.Errorf("not a hijack connection")
}

conn, _, err := hijacker.Hijack()
if err != nil {
return nil, nil, nil, err
}

// set raw mode
conn.Write([]byte{})
return conn, conn, func() error {
return conn.Close()
}, nil
}
6 changes: 5 additions & 1 deletion cli/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/pkg/ioutils"

"github.com/docker/docker/pkg/stdcopy"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -145,9 +146,12 @@ func holdHijackConnection(ctx context.Context, conn net.Conn, reader *bufio.Read
go func() {
if stdin {
io.Copy(conn, os.Stdin)
// close write if receive CTRL-D
if cw, ok := conn.(ioutils.CloseWriter); ok {
fuweid marked this conversation as resolved.
Show resolved Hide resolved
cw.CloseWrite()
}
}

// TODO: close write side of conn
close(stdinDone)
}()

Expand Down
5 changes: 5 additions & 0 deletions cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/pkg/ioutils"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -135,6 +136,10 @@ func (rc *RunCommand) runRun(args []string) error {
}()
go func() {
io.Copy(conn, os.Stdin)
// close write if receive CTRL-D
if cw, ok := conn.(ioutils.CloseWriter); ok {
cw.CloseWrite()
}
}()
}

Expand Down
1 change: 0 additions & 1 deletion cri/stream/remotecommand/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func ServeAttach(ctx context.Context, w http.ResponseWriter, req *http.Request,
defer streamCtx.conn.Close()

err := attacher.Attach(ctx, container, streamOpts, &Streams{
StreamCh: make(chan struct{}, 1),
StdinStream: streamCtx.stdinStream,
StdoutStream: streamCtx.stdoutStream,
StderrStream: streamCtx.stderrStream,
Expand Down
2 changes: 0 additions & 2 deletions cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Options struct {
// Streams contains all the streams used to stdio for
// remote command execution.
type Streams struct {
// Notified from StreamCh if streams broken.
StreamCh chan struct{}
StdinStream io.ReadCloser
StdoutStream io.WriteCloser
StderrStream io.WriteCloser
Expand Down
62 changes: 27 additions & 35 deletions cri/stream/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"io"
"os/exec"
"strings"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream/remotecommand"
"github.com/alibaba/pouch/daemon/mgr"
pkgstreams "github.com/alibaba/pouch/pkg/streams"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -52,38 +52,31 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
return 0, fmt.Errorf("failed to create exec for container %q: %v", containerID, err)
}

attachConfig := &mgr.AttachConfig{
Stdin: streamOpts.Stdin,
Streams: streams,
MuxDisabled: true,
}

err = s.containerMgr.StartExec(ctx, execid, attachConfig)
if err != nil {
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

handleResizing(containerID, execid, resizeChan, func(size apitypes.ResizeOptions) {
err := s.containerMgr.ResizeExec(ctx, execid, size)
if err != nil {
logrus.Errorf("failed to resize process %q console for container %q: %v", execid, containerID, err)
}
})

// TODO Find a better way instead of the dead loop
var ei *apitypes.ContainerExecInspect
for {
ei, err = s.containerMgr.InspectExec(ctx, execid)
if err != nil {
return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err)
}
// Loop until exec finished.
if !ei.Running {
break
}
time.Sleep(100 * time.Millisecond)
attachCfg := &pkgstreams.AttachConfig{
UseStdin: createConfig.AttachStdin,
Stdin: streams.StdinStream,
UseStdout: createConfig.AttachStdout,
Stdout: streams.StdoutStream,
UseStderr: createConfig.AttachStderr,
Stderr: streams.StderrStream,
Terminal: createConfig.Tty,
}

if err := s.containerMgr.StartExec(ctx, execid, attachCfg); err != nil {
return 0, fmt.Errorf("failed to exec for container %q: %v", containerID, err)
}

ei, err := s.containerMgr.InspectExec(ctx, execid)
if err != nil {
return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err)
}
return uint32(ei.ExitCode), nil
}

Expand All @@ -110,20 +103,19 @@ func handleResizing(containerID, execID string, resizeChan <-chan apitypes.Resiz

// Attach attaches to a running container.
func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error {
attachConfig := &mgr.AttachConfig{
Stdin: streamOpts.Stdin,
Stdout: streamOpts.Stdout,
Stderr: streamOpts.Stderr,
Streams: streams,
// TODO(fuweid): could we close stdin after stop attach?
attachCfg := &pkgstreams.AttachConfig{
UseStdin: streamOpts.Stdin,
Stdin: streams.StdinStream,
UseStdout: streamOpts.Stdout,
Stdout: streams.StdoutStream,
UseStderr: streamOpts.Stderr,
Stderr: streams.StderrStream,
Terminal: streamOpts.TTY,
}

err := s.containerMgr.Attach(ctx, containerID, attachConfig)
if err != nil {
if err := s.containerMgr.AttachContainerIO(ctx, containerID, attachCfg); err != nil {
return fmt.Errorf("failed to attach to container %q: %v", containerID, err)
}

<-streams.StreamCh

return nil
}

Expand Down
Loading