From c8e0a5247dee453e3070c0ac5d73d8af1527176e Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 18 Jul 2024 18:41:42 +0200 Subject: [PATCH 1/5] Improve the logger and make it play well with docker plugin output. Signed-off-by: Thomas Hallgren --- main.go | 4 ++ pkg/log/log.go | 117 ++++++++++++++++++++++++++++++++++------------ pkg/sftp/mount.go | 10 +++- 3 files changed, 100 insertions(+), 31 deletions(-) diff --git a/main.go b/main.go index eeeb01a..23ac2f6 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "os" "strconv" @@ -17,6 +18,9 @@ func main() { ok, _ = strconv.ParseBool(debug) log.SetDebug(ok) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log.Start(ctx.Done()) if err := volume.NewHandler(sftp.NewDriver()).ServeUnix(pluginSocket, 0); err != nil { log.Fatal(err) diff --git a/pkg/log/log.go b/pkg/log/log.go index f84d73c..f6303cb 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -8,67 +8,126 @@ package log import ( + "bytes" "fmt" "io" + "math" "os" + "slices" + "strings" + "sync" + "time" ) -var nl = []byte{'\n'} +var msgChan = make(chan string, 10) var debug = false -func SetDebug(flag bool) { - debug = flag +func Start(doneCh <-chan struct{}) { + // This code is designed to send the log as a serialized stream of batches that are at least + // 50 ms apart. + // For reasons unknown, logging too quickly will result in lost log output when capturing + // the output under /run/docker/plugins. + w := os.Stderr + var buf bytes.Buffer + bufLock := sync.Mutex{} + fire := time.AfterFunc(time.Duration(math.MaxInt64), func() { + bufLock.Lock() + if buf.Len() > 0 { + _, _ = w.Write(buf.Bytes()) + buf.Reset() + } + bufLock.Unlock() + }) + + go func() { + for { + select { + case msg := <-msgChan: + bufLock.Lock() + buf.WriteString(msg) + buf.WriteByte('\n') + bufLock.Unlock() + fire.Reset(50 * time.Millisecond) + case <-doneCh: + fire.Stop() + // Output any remaining logs. + // No need for locks now. The timer is dead. + for len(msgChan) > 0 { + buf.WriteString(<-msgChan) + buf.WriteByte('\n') + } + if buf.Len() > 0 { + _, _ = w.Write(buf.Bytes()) + } + return + } + } + }() } -func Error(v any) { - switch v := v.(type) { - case nil: - case error: - _, _ = fmt.Fprintln(os.Stderr, v.Error()) - case string: - _, _ = fmt.Fprintln(os.Stderr, v) - case fmt.Stringer: - _, _ = fmt.Fprintln(os.Stderr, v) - default: - err := fmt.Errorf("%v", v) - _, _ = fmt.Fprintln(os.Stderr, err.Error()) +type logWriter string + +func (w logWriter) Write(p []byte) (n int, err error) { + ps := string(p) + if len(ps) > 0 { + for _, line := range strings.Split(string(p), "\n") { + if len(line) > 0 { + tsPrintln(string(w), line) + } + } } + return len(p), nil +} + +func Stdlog(level string) io.Writer { + return logWriter(level) +} + +func SetDebug(flag bool) { + debug = flag } func IsDebug() bool { return debug } -func Errorf(format string, args ...any) { - Error(fmt.Errorf(format, args...)) +func Fatal(args ...any) { + Error(args...) + os.Exit(1) } -func Fatal(v any) { - Error(v) - os.Exit(1) +func Error(args ...any) { + tsPrintln("error", args...) } -func Info(v any) { - _, _ = fmt.Fprintln(os.Stderr, v) +func Errorf(format string, args ...any) { + tsPrintf("error", format, args...) +} + +func Info(args ...any) { + tsPrintln("info", args...) } func Infof(format string, args ...any) { - fprintfln(os.Stderr, format, args...) + tsPrintf("info", format, args...) } -func Debug(v any) { +func Debug(args ...any) { if debug { - Info(v) + tsPrintln("debug", args...) } } func Debugf(format string, args ...any) { if debug { - Infof(format, args...) + tsPrintf("debug", format, args...) } } -func fprintfln(w io.Writer, format string, args ...any) { - _, _ = fmt.Fprintf(w, format, args...) - _, _ = w.Write(nl) +func tsPrintf(level string, format string, args ...any) { + msgChan <- fmt.Sprintf("%s %-6s "+format, slices.Insert(args, 0, any(time.Now().Format("15:04:05.0000")), any(level))...) +} + +func tsPrintln(level string, args ...any) { + msgChan <- fmt.Sprint(slices.Insert(args, 0, any(fmt.Sprintf("%s %-6s", time.Now().Format("15:04:05.0000"), level)))...) } diff --git a/pkg/sftp/mount.go b/pkg/sftp/mount.go index 390a679..52601f4 100644 --- a/pkg/sftp/mount.go +++ b/pkg/sftp/mount.go @@ -98,13 +98,19 @@ func (m *mount) mountVolume() error { "-o", "follow_symlinks", "-o", "allow_root", // needed to make --docker-run work as docker runs as root } + + var sl io.Writer if log.IsDebug() { sshfsArgs = append(sshfsArgs, "-d") + sl = log.Stdlog("debug") + } else { + sl = log.Stdlog("info") } exe := "sshfs" cmd := exec.Command(exe, sshfsArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + log.Debug(cmd) + cmd.Stdout = sl + cmd.Stderr = sl ctx := context.Background() From 2fa6d4f63307c3e692a6640161de465ca3f6db84 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 18 Jul 2024 18:42:30 +0200 Subject: [PATCH 2/5] Fix so that make-debug works with recent telepresence versions. Signed-off-by: Thomas Hallgren --- Makefile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index e363ceb..365306b 100644 --- a/Makefile +++ b/Makefile @@ -53,10 +53,10 @@ enable: rootfs .PHONY: debug debug: rootfs - docker plugin rm --force $(PLUGIN_DEV_IMAGE) 2>/dev/null || true - docker plugin create $(PLUGIN_DEV_IMAGE) $(BUILD_DIR) - docker plugin set $(PLUGIN_DEV_IMAGE) DEBUG=true - docker plugin enable $(PLUGIN_DEV_IMAGE) + docker plugin rm --force $(PLUGIN_DEV_IMAGE)-debug 2>/dev/null || true + docker plugin create $(PLUGIN_DEV_IMAGE)-debug $(BUILD_DIR) + docker plugin set $(PLUGIN_DEV_IMAGE)-debug DEBUG=true + docker plugin enable $(PLUGIN_DEV_IMAGE)-debug # Recreate the plugin from the rootfs with some tag. This target is called # repeatedly in order to give the plugin different tags (because plugins cannot From c6774f1e54cda33721c25293e9e29f779ea04177 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 18 Jul 2024 18:42:51 +0200 Subject: [PATCH 3/5] Improve debug instructions in README.md Signed-off-by: Thomas Hallgren --- README.md | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 275344f..457617b 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,14 @@ $ docker plugin install datawire/telemount:arm64 --alias telemount ### Intercept and create volumes +Connect in docker mode and then intercept with `--docker-run`. The mounts will automatically use this plugin: +``` +$ telepresence connect --docker +$ telepresence intercept echo-easy --docker-run -- busybox ls ls /var/run/secrets/kubernetes.io/serviceaccount +``` + +#### More detailed (not using --docker and --docker-run) + Create an intercept. Use `--local-mount-port 1234` to set up a bridge instead of mounting, and `--detailed-ouput --output yaml` so that the command outputs the environment in a readable form: ```console @@ -54,7 +62,16 @@ token ## Debugging -Start by building the plugin for debugging. This command both builds and enables the plugin: +Start by configuring telepresence to not check for the latest version of the plugin, but instead use our debug version by +adding the following yaml to the `config.yml` (on Linux, this will be in `~/.config/telepresence/config.yml`, and on mac +you'll find it in `"$HOME/Library/Application Support/telepresence/config.yml"`: +```yaml +intercept: + telemount: + tag: debug +``` + +Build the plugin for debugging. The command both builds and enables the plugin: ```console $ make debug ``` @@ -67,6 +84,9 @@ and start viewing what it prints on stderr. All logging goes to stderr: ``` $ sudo cat /run/docker/plugins/$PLUGIN_ID/$PLUGIN_ID-stderr ``` + +Now connect telepresence with `--docker` and do an intercept with `--docker-run`. + ## Credits To the [Rclone project](https://github.com/rclone/rclone) project and [PR 5668](https://github.com/rclone/rclone/pull/5668) specifically for showing a good way to create multi-arch plugins. From c84b44d3f9316ac8cb13f43b502eac7acd62b43f Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 18 Jul 2024 18:44:15 +0200 Subject: [PATCH 4/5] Use -o auto_unmount instead of calling fusermount -uz explicitly. Signed-off-by: Thomas Hallgren --- pkg/sftp/mount.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/sftp/mount.go b/pkg/sftp/mount.go index 52601f4..e57748d 100644 --- a/pkg/sftp/mount.go +++ b/pkg/sftp/mount.go @@ -96,6 +96,7 @@ func (m *mount) mountVolume() error { // mount directives "-o", "follow_symlinks", + "-o", "auto_unmount", "-o", "allow_root", // needed to make --docker-run work as docker runs as root } @@ -164,15 +165,6 @@ func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) { m.done <- err } } - m.mounted.Store(false) - - // sshfs sometimes leave the mount point in a bad state. This will clean it up - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - go func() { - defer cancel() - _ = exec.Command("fusermount", "-uz", m.mountPoint).Run() - }() - <-ctx.Done() } func (m *mount) detectSshfsStarted(ctx context.Context, st *unix.Stat_t) error { From 4297de95ce975d4e8c6b43f297f517f84e550d1a Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 18 Jul 2024 18:49:32 +0200 Subject: [PATCH 5/5] Ensure that a remote mount declaration with port number isn't reused. The mount struct reflects a one-time connection for the sshfs command. It cannot be reused and must therefore be purged when the sshfs terminates. Signed-off-by: Thomas Hallgren --- pkg/sftp/driver.go | 6 +++++- pkg/sftp/mount.go | 13 ++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/sftp/driver.go b/pkg/sftp/driver.go index da47147..e08295b 100644 --- a/pkg/sftp/driver.go +++ b/pkg/sftp/driver.go @@ -238,7 +238,11 @@ func (d *driver) getRemoteMount(host string, port uint16) (*mount, error) { if m, ok := d.remoteMounts[key]; ok { return m, nil } - m := newMount(filepath.Join(d.volumePath, host, ps), host, port) + m := newMount(filepath.Join(d.volumePath, host, ps), host, port, func() { + d.lock.Lock() + delete(d.remoteMounts, key) + d.lock.Unlock() + }) if err := m.mountVolume(); err != nil { return nil, err } diff --git a/pkg/sftp/mount.go b/pkg/sftp/mount.go index e57748d..47ae9f8 100644 --- a/pkg/sftp/mount.go +++ b/pkg/sftp/mount.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "os" "os/exec" "sync/atomic" @@ -18,6 +19,7 @@ import ( // mount is shared between volumeMounts. type mount struct { + cancel context.CancelFunc mountPoint string host string port uint16 @@ -27,12 +29,13 @@ type mount struct { proc *os.Process } -func newMount(mountPoint, host string, port uint16) *mount { +func newMount(mountPoint, host string, port uint16, cancel context.CancelFunc) *mount { return &mount{ mountPoint: mountPoint, host: host, port: port, volumes: make(map[string]*volumeDir), + cancel: cancel, } } @@ -122,11 +125,12 @@ func (m *mount) mountVolume() error { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} log.Debugf("mounting %s", m) + m.mounted.Store(true) if err := cmd.Start(); err != nil { + m.mounted.Store(false) return fmt.Errorf("failed to start sshfs to mount %s: %w", m.mountPoint, err) } m.proc = cmd.Process - m.mounted.Store(true) m.done = make(chan error, 2) starting := atomic.Bool{} @@ -146,6 +150,8 @@ func (m *mount) mountVolume() error { func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) { defer close(m.done) err := cmd.Wait() + m.cancel() + m.mounted.Store(false) if err != nil { var ex *exec.ExitError if errors.As(err, &ex) { @@ -159,7 +165,6 @@ func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) { } // Restore to unmounted state - m.mounted.Store(false) if starting.Swap(false) { if err != nil { m.done <- err @@ -236,6 +241,8 @@ func (m *mount) unmountVolume() (err error) { log.Debug("forcing sshfs to stop") _ = m.proc.Kill() } + } else { + log.Debugf("sshfs on %s is not running", m.mountPoint) } return err }