Skip to content

Commit

Permalink
Merge pull request #3475 from tstromberg/intlogs
Browse files Browse the repository at this point in the history
tee party: Stream cmd output to tests when -v is enabled, and stream SSH output to logs
  • Loading branch information
tstromberg authored Dec 21, 2018
2 parents 94245d0 + b5185c6 commit ae4221b
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 40 deletions.
95 changes: 76 additions & 19 deletions pkg/minikube/bootstrapper/ssh_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package bootstrapper

import (
"bytes"
"fmt"
"io"
"path"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/util"
)

// SSHRunner runs commands through SSH.
Expand All @@ -52,25 +54,80 @@ func (s *SSHRunner) Remove(f assets.CopyableFile) error {
return sess.Run(cmd)
}

type singleWriter struct {
b bytes.Buffer
mu sync.Mutex
}

func (w *singleWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.b.Write(p)
}

// teeSSH runs an SSH command, streaming stdout, stderr to logs
func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {
outPipe, err := s.StdoutPipe()
if err != nil {
return errors.Wrap(err, "stdout")
}

errPipe, err := s.StderrPipe()
if err != nil {
return errors.Wrap(err, "stderr")
}
var wg sync.WaitGroup
wg.Add(2)

go func() {
if err := util.TeePrefix(util.ErrPrefix, errPipe, errB, glog.Infof); err != nil {
glog.Errorf("tee stderr: %v", err)
}
wg.Done()
}()
go func() {
if err := util.TeePrefix(util.OutPrefix, outPipe, outB, glog.Infof); err != nil {
glog.Errorf("tee stdout: %v", err)
}
wg.Done()
}()
err = s.Run(cmd)
wg.Wait()
return err
}

// Run starts a command on the remote and waits for it to return.
func (s *SSHRunner) Run(cmd string) error {
glog.Infoln("Run:", cmd)
glog.Infof("SSH: %s", cmd)
sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "getting ssh session")
return errors.Wrap(err, "NewSession")
}
defer sess.Close()
return sess.Run(cmd)

defer func() {
if err := sess.Close(); err != nil {
if err != io.EOF {
glog.Errorf("session close: %v", err)
}
}
}()
var outB bytes.Buffer
var errB bytes.Buffer
err = teeSSH(sess, cmd, &outB, &errB)
if err != nil {
return errors.Wrapf(err, "command failed: %s\nstdout: %s\nstderr: %s", cmd, outB.String(), errB.String())
}
return nil
}

// CombinedOutputTo runs the command and stores both command
// output and error to out.
func (s *SSHRunner) CombinedOutputTo(cmd string, out io.Writer) error {
b, err := s.CombinedOutput(cmd)
func (s *SSHRunner) CombinedOutputTo(cmd string, w io.Writer) error {
out, err := s.CombinedOutput(cmd)
if err != nil {
return errors.Wrapf(err, "running command: %s\n.", cmd)
return err
}
_, err = out.Write([]byte(b))
_, err = w.Write([]byte(out))
return err
}

Expand All @@ -80,15 +137,17 @@ func (s *SSHRunner) CombinedOutput(cmd string) (string, error) {
glog.Infoln("Run with output:", cmd)
sess, err := s.c.NewSession()
if err != nil {
return "", errors.Wrap(err, "getting ssh session")
return "", errors.Wrap(err, "NewSession")
}
defer sess.Close()

b, err := sess.CombinedOutput(cmd)
var combined singleWriter
err = teeSSH(sess, cmd, &combined, &combined)
out := combined.b.String()
if err != nil {
return "", errors.Wrapf(err, "running command: %s\n, output: %s", cmd, string(b))
return "", err
}
return string(b), nil
return out, nil
}

// Copy copies a file to the remote over SSH.
Expand All @@ -97,18 +156,18 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", f.GetTargetDir())
for _, cmd := range []string{deleteCmd, mkdirCmd} {
if err := s.Run(cmd); err != nil {
return errors.Wrapf(err, "Error running command: %s", cmd)
return errors.Wrapf(err, "pre-copy")
}
}

sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "Error creating new session via ssh client")
return errors.Wrap(err, "NewSession")
}

w, err := sess.StdinPipe()
if err != nil {
return errors.Wrap(err, "Error accessing StdinPipe via ssh session")
return errors.Wrap(err, "StdinPipe")
}
// The scpcmd below *should not* return until all data is copied and the
// StdinPipe is closed. But let's use a WaitGroup to make it expicit.
Expand All @@ -123,12 +182,10 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
fmt.Fprint(w, "\x00")
}()

scpcmd := fmt.Sprintf("sudo scp -t %s", f.GetTargetDir())
out, err := sess.CombinedOutput(scpcmd)
_, err = sess.CombinedOutput(fmt.Sprintf("sudo scp -t %s", f.GetTargetDir()))
if err != nil {
return errors.Wrapf(err, "Error running scp command: %s output: %s", scpcmd, out)
return err
}
wg.Wait()

return nil
}
33 changes: 33 additions & 0 deletions pkg/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package util

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
Expand All @@ -33,6 +35,9 @@ import (
"github.com/pkg/errors"
)

const ErrPrefix = "! "
const OutPrefix = "> "

const (
downloadURL = "https://storage.googleapis.com/minikube/releases/%s/minikube-%s-amd64%s"
)
Expand Down Expand Up @@ -201,3 +206,31 @@ func MaybeChownDirRecursiveToMinikubeUser(dir string) error {
}
return nil
}

// TeePrefix copies bytes from a reader to writer, logging each new line.
func TeePrefix(prefix string, r io.Reader, w io.Writer, logger func(format string, args ...interface{})) error {
scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanBytes)
var line bytes.Buffer

for scanner.Scan() {
b := scanner.Bytes()
if _, err := w.Write(b); err != nil {
return err
}

if bytes.IndexAny(b, "\r\n") == 0 {
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
line.Reset()
}
continue
}
line.Write(b)
}
// Catch trailing output in case stream does not end with a newline
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
}
return nil
}
40 changes: 40 additions & 0 deletions pkg/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package util

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -158,3 +162,39 @@ func TestGetBinaryDownloadURL(t *testing.T) {
}

}

func TestTeePrefix(t *testing.T) {
var in bytes.Buffer
var out bytes.Buffer
var logged strings.Builder

logSink := func(format string, args ...interface{}) {
logged.WriteString("(" + fmt.Sprintf(format, args...) + ")")
}

// Simulate the primary use case: tee in the background. This also helps avoid I/O races.
var wg sync.WaitGroup
wg.Add(1)
go func() {
TeePrefix(":", &in, &out, logSink)
wg.Done()
}()

in.Write([]byte("goo"))
in.Write([]byte("\n"))
in.Write([]byte("g\r\n\r\n"))
in.Write([]byte("le"))
wg.Wait()

gotBytes := out.Bytes()
wantBytes := []byte("goo\ng\r\n\r\nle")
if !bytes.Equal(gotBytes, wantBytes) {
t.Errorf("output=%q, want: %q", gotBytes, wantBytes)
}

gotLog := logged.String()
wantLog := "(:goo)(:g)(:le)"
if gotLog != wantLog {
t.Errorf("log=%q, want: %q", gotLog, wantLog)
}
}
22 changes: 11 additions & 11 deletions test/integration/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,32 @@ func TestDocker(t *testing.T) {
mk.RunWithContext(ctx, "delete")

startCmd := fmt.Sprintf("start %s %s %s", mk.StartArgs, mk.Args,
"--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true")
out, err := mk.RunWithContext(ctx, startCmd)
"--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true --alsologtostderr --v=5")
stdout, stderr, err := mk.RunWithContext(ctx, startCmd)
if err != nil {
t.Fatalf("start: %v\nstart out: %s", err, out)
t.Fatalf("start: %v\nstdout: %s\nstderr: %s", err, stdout, stderr)
}

mk.EnsureRunning()

out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager")
stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager")
if err != nil {
t.Errorf("docker env: %v\ndocker env out: %s", err, out)
t.Errorf("docker env: %v\nstderr: %s", err, stderr)
}

for _, envVar := range []string{"FOO=BAR", "BAZ=BAT"} {
if !strings.Contains(string(out), envVar) {
t.Errorf("Env var %s missing: %s.", envVar, out)
if !strings.Contains(stdout, envVar) {
t.Errorf("Env var %s missing: %s.", envVar, stdout)
}
}

out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager")
stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager")
if err != nil {
t.Errorf("ssh show docker: %v\nshow docker out: %s", err, out)
t.Errorf("ssh show docker: %v\nstderr: %s", err, stderr)
}
for _, opt := range []string{"--debug", "--icc=true"} {
if !strings.Contains(string(out), opt) {
t.Fatalf("Option %s missing from ExecStart: %s.", opt, out)
if !strings.Contains(stdout, opt) {
t.Fatalf("Option %s missing from ExecStart: %s.", opt, stdout)
}
}
}
Loading

0 comments on commit ae4221b

Please sign in to comment.