Skip to content

Commit

Permalink
Merge pull request #1 from datawire/thallgren/no-reuse-after-sshfs-end
Browse files Browse the repository at this point in the history
Ensure that a remote mount declaration with port number isn't reused.
  • Loading branch information
thallgren authored Jul 18, 2024
2 parents bdc394e + 4297de9 commit 9ac6698
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 49 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ enable: rootfs

.PHONY: debug
debug: rootfs
docker plugin rm --force $(PLUGIN_DEV_IMAGE) 2>/dev/null || true
docker plugin create $(PLUGIN_DEV_IMAGE) $(BUILD_DIR)
docker plugin set $(PLUGIN_DEV_IMAGE) DEBUG=true
docker plugin enable $(PLUGIN_DEV_IMAGE)
docker plugin rm --force $(PLUGIN_DEV_IMAGE)-debug 2>/dev/null || true
docker plugin create $(PLUGIN_DEV_IMAGE)-debug $(BUILD_DIR)
docker plugin set $(PLUGIN_DEV_IMAGE)-debug DEBUG=true
docker plugin enable $(PLUGIN_DEV_IMAGE)-debug

# Recreate the plugin from the rootfs with some tag. This target is called
# repeatedly in order to give the plugin different tags (because plugins cannot
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ $ docker plugin install datawire/telemount:arm64 --alias telemount

### Intercept and create volumes

Connect in docker mode and then intercept with `--docker-run`. The mounts will automatically use this plugin:
```
$ telepresence connect --docker
$ telepresence intercept echo-easy --docker-run -- busybox ls ls /var/run/secrets/kubernetes.io/serviceaccount
```

#### More detailed (not using --docker and --docker-run)

Create an intercept. Use `--local-mount-port 1234` to set up a bridge instead of mounting, and `--detailed-ouput --output yaml` so that
the command outputs the environment in a readable form:
```console
Expand All @@ -54,7 +62,16 @@ token

## Debugging

Start by building the plugin for debugging. This command both builds and enables the plugin:
Start by configuring telepresence to not check for the latest version of the plugin, but instead use our debug version by
adding the following yaml to the `config.yml` (on Linux, this will be in `~/.config/telepresence/config.yml`, and on mac
you'll find it in `"$HOME/Library/Application Support/telepresence/config.yml"`:
```yaml
intercept:
telemount:
tag: debug
```
Build the plugin for debugging. The command both builds and enables the plugin:
```console
$ make debug
```
Expand All @@ -67,6 +84,9 @@ and start viewing what it prints on stderr. All logging goes to stderr:
```
$ sudo cat /run/docker/plugins/$PLUGIN_ID/$PLUGIN_ID-stderr
```

Now connect telepresence with `--docker` and do an intercept with `--docker-run`.

## Credits
To the [Rclone project](https://github.com/rclone/rclone) project and [PR 5668](https://github.com/rclone/rclone/pull/5668)
specifically for showing a good way to create multi-arch plugins.
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"os"
"strconv"

Expand All @@ -17,6 +18,9 @@ func main() {
ok, _ = strconv.ParseBool(debug)
log.SetDebug(ok)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Start(ctx.Done())

if err := volume.NewHandler(sftp.NewDriver()).ServeUnix(pluginSocket, 0); err != nil {
log.Fatal(err)
Expand Down
117 changes: 88 additions & 29 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,126 @@
package log

import (
"bytes"
"fmt"
"io"
"math"
"os"
"slices"
"strings"
"sync"
"time"
)

var nl = []byte{'\n'}
var msgChan = make(chan string, 10)
var debug = false

func SetDebug(flag bool) {
debug = flag
func Start(doneCh <-chan struct{}) {
// This code is designed to send the log as a serialized stream of batches that are at least
// 50 ms apart.
// For reasons unknown, logging too quickly will result in lost log output when capturing
// the output under /run/docker/plugins.
w := os.Stderr
var buf bytes.Buffer
bufLock := sync.Mutex{}
fire := time.AfterFunc(time.Duration(math.MaxInt64), func() {
bufLock.Lock()
if buf.Len() > 0 {
_, _ = w.Write(buf.Bytes())
buf.Reset()
}
bufLock.Unlock()
})

go func() {
for {
select {
case msg := <-msgChan:
bufLock.Lock()
buf.WriteString(msg)
buf.WriteByte('\n')
bufLock.Unlock()
fire.Reset(50 * time.Millisecond)
case <-doneCh:
fire.Stop()
// Output any remaining logs.
// No need for locks now. The timer is dead.
for len(msgChan) > 0 {
buf.WriteString(<-msgChan)
buf.WriteByte('\n')
}
if buf.Len() > 0 {
_, _ = w.Write(buf.Bytes())
}
return
}
}
}()
}

func Error(v any) {
switch v := v.(type) {
case nil:
case error:
_, _ = fmt.Fprintln(os.Stderr, v.Error())
case string:
_, _ = fmt.Fprintln(os.Stderr, v)
case fmt.Stringer:
_, _ = fmt.Fprintln(os.Stderr, v)
default:
err := fmt.Errorf("%v", v)
_, _ = fmt.Fprintln(os.Stderr, err.Error())
type logWriter string

func (w logWriter) Write(p []byte) (n int, err error) {
ps := string(p)
if len(ps) > 0 {
for _, line := range strings.Split(string(p), "\n") {
if len(line) > 0 {
tsPrintln(string(w), line)
}
}
}
return len(p), nil
}

func Stdlog(level string) io.Writer {
return logWriter(level)
}

func SetDebug(flag bool) {
debug = flag
}

func IsDebug() bool {
return debug
}

func Errorf(format string, args ...any) {
Error(fmt.Errorf(format, args...))
func Fatal(args ...any) {
Error(args...)
os.Exit(1)
}

func Fatal(v any) {
Error(v)
os.Exit(1)
func Error(args ...any) {
tsPrintln("error", args...)
}

func Info(v any) {
_, _ = fmt.Fprintln(os.Stderr, v)
func Errorf(format string, args ...any) {
tsPrintf("error", format, args...)
}

func Info(args ...any) {
tsPrintln("info", args...)
}

func Infof(format string, args ...any) {
fprintfln(os.Stderr, format, args...)
tsPrintf("info", format, args...)
}

func Debug(v any) {
func Debug(args ...any) {
if debug {
Info(v)
tsPrintln("debug", args...)
}
}

func Debugf(format string, args ...any) {
if debug {
Infof(format, args...)
tsPrintf("debug", format, args...)
}
}

func fprintfln(w io.Writer, format string, args ...any) {
_, _ = fmt.Fprintf(w, format, args...)
_, _ = w.Write(nl)
func tsPrintf(level string, format string, args ...any) {
msgChan <- fmt.Sprintf("%s %-6s "+format, slices.Insert(args, 0, any(time.Now().Format("15:04:05.0000")), any(level))...)
}

func tsPrintln(level string, args ...any) {
msgChan <- fmt.Sprint(slices.Insert(args, 0, any(fmt.Sprintf("%s %-6s", time.Now().Format("15:04:05.0000"), level)))...)
}
6 changes: 5 additions & 1 deletion pkg/sftp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ func (d *driver) getRemoteMount(host string, port uint16) (*mount, error) {
if m, ok := d.remoteMounts[key]; ok {
return m, nil
}
m := newMount(filepath.Join(d.volumePath, host, ps), host, port)
m := newMount(filepath.Join(d.volumePath, host, ps), host, port, func() {
d.lock.Lock()
delete(d.remoteMounts, key)
d.lock.Unlock()
})
if err := m.mountVolume(); err != nil {
return nil, err
}
Expand Down
33 changes: 19 additions & 14 deletions pkg/sftp/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync/atomic"
Expand All @@ -18,6 +19,7 @@ import (

// mount is shared between volumeMounts.
type mount struct {
cancel context.CancelFunc
mountPoint string
host string
port uint16
Expand All @@ -27,12 +29,13 @@ type mount struct {
proc *os.Process
}

func newMount(mountPoint, host string, port uint16) *mount {
func newMount(mountPoint, host string, port uint16, cancel context.CancelFunc) *mount {
return &mount{
mountPoint: mountPoint,
host: host,
port: port,
volumes: make(map[string]*volumeDir),
cancel: cancel,
}
}

Expand Down Expand Up @@ -96,15 +99,22 @@ func (m *mount) mountVolume() error {

// mount directives
"-o", "follow_symlinks",
"-o", "auto_unmount",
"-o", "allow_root", // needed to make --docker-run work as docker runs as root
}

var sl io.Writer
if log.IsDebug() {
sshfsArgs = append(sshfsArgs, "-d")
sl = log.Stdlog("debug")
} else {
sl = log.Stdlog("info")
}
exe := "sshfs"
cmd := exec.Command(exe, sshfsArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
log.Debug(cmd)
cmd.Stdout = sl
cmd.Stderr = sl

ctx := context.Background()

Expand All @@ -115,11 +125,12 @@ func (m *mount) mountVolume() error {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

log.Debugf("mounting %s", m)
m.mounted.Store(true)
if err := cmd.Start(); err != nil {
m.mounted.Store(false)
return fmt.Errorf("failed to start sshfs to mount %s: %w", m.mountPoint, err)
}
m.proc = cmd.Process
m.mounted.Store(true)

m.done = make(chan error, 2)
starting := atomic.Bool{}
Expand All @@ -139,6 +150,8 @@ func (m *mount) mountVolume() error {
func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) {
defer close(m.done)
err := cmd.Wait()
m.cancel()
m.mounted.Store(false)
if err != nil {
var ex *exec.ExitError
if errors.As(err, &ex) {
Expand All @@ -152,21 +165,11 @@ func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) {
}

// Restore to unmounted state
m.mounted.Store(false)
if starting.Swap(false) {
if err != nil {
m.done <- err
}
}
m.mounted.Store(false)

// sshfs sometimes leave the mount point in a bad state. This will clean it up
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
go func() {
defer cancel()
_ = exec.Command("fusermount", "-uz", m.mountPoint).Run()
}()
<-ctx.Done()
}

func (m *mount) detectSshfsStarted(ctx context.Context, st *unix.Stat_t) error {
Expand Down Expand Up @@ -238,6 +241,8 @@ func (m *mount) unmountVolume() (err error) {
log.Debug("forcing sshfs to stop")
_ = m.proc.Kill()
}
} else {
log.Debugf("sshfs on %s is not running", m.mountPoint)
}
return err
}
Expand Down

0 comments on commit 9ac6698

Please sign in to comment.