Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that a remote mount declaration with port number isn't reused. #1

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ enable: rootfs

.PHONY: debug
debug: rootfs
docker plugin rm --force $(PLUGIN_DEV_IMAGE) 2>/dev/null || true
docker plugin create $(PLUGIN_DEV_IMAGE) $(BUILD_DIR)
docker plugin set $(PLUGIN_DEV_IMAGE) DEBUG=true
docker plugin enable $(PLUGIN_DEV_IMAGE)
docker plugin rm --force $(PLUGIN_DEV_IMAGE)-debug 2>/dev/null || true
docker plugin create $(PLUGIN_DEV_IMAGE)-debug $(BUILD_DIR)
docker plugin set $(PLUGIN_DEV_IMAGE)-debug DEBUG=true
docker plugin enable $(PLUGIN_DEV_IMAGE)-debug

# Recreate the plugin from the rootfs with some tag. This target is called
# repeatedly in order to give the plugin different tags (because plugins cannot
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ $ docker plugin install datawire/telemount:arm64 --alias telemount

### Intercept and create volumes

Connect in docker mode and then intercept with `--docker-run`. The mounts will automatically use this plugin:
```
$ telepresence connect --docker
$ telepresence intercept echo-easy --docker-run -- busybox ls ls /var/run/secrets/kubernetes.io/serviceaccount
```

#### More detailed (not using --docker and --docker-run)

Create an intercept. Use `--local-mount-port 1234` to set up a bridge instead of mounting, and `--detailed-ouput --output yaml` so that
the command outputs the environment in a readable form:
```console
Expand All @@ -54,7 +62,16 @@ token

## Debugging

Start by building the plugin for debugging. This command both builds and enables the plugin:
Start by configuring telepresence to not check for the latest version of the plugin, but instead use our debug version by
adding the following yaml to the `config.yml` (on Linux, this will be in `~/.config/telepresence/config.yml`, and on mac
you'll find it in `"$HOME/Library/Application Support/telepresence/config.yml"`:
```yaml
intercept:
telemount:
tag: debug
```

Build the plugin for debugging. The command both builds and enables the plugin:
```console
$ make debug
```
Expand All @@ -67,6 +84,9 @@ and start viewing what it prints on stderr. All logging goes to stderr:
```
$ sudo cat /run/docker/plugins/$PLUGIN_ID/$PLUGIN_ID-stderr
```

Now connect telepresence with `--docker` and do an intercept with `--docker-run`.

## Credits
To the [Rclone project](https://github.com/rclone/rclone) project and [PR 5668](https://github.com/rclone/rclone/pull/5668)
specifically for showing a good way to create multi-arch plugins.
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"os"
"strconv"

Expand All @@ -17,6 +18,9 @@ func main() {
ok, _ = strconv.ParseBool(debug)
log.SetDebug(ok)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Start(ctx.Done())

if err := volume.NewHandler(sftp.NewDriver()).ServeUnix(pluginSocket, 0); err != nil {
log.Fatal(err)
Expand Down
117 changes: 88 additions & 29 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,126 @@
package log

import (
"bytes"
"fmt"
"io"
"math"
"os"
"slices"
"strings"
"sync"
"time"
)

var nl = []byte{'\n'}
var msgChan = make(chan string, 10)
var debug = false

func SetDebug(flag bool) {
debug = flag
func Start(doneCh <-chan struct{}) {
// This code is designed to send the log as a serialized stream of batches that are at least
// 50 ms apart.
// For reasons unknown, logging too quickly will result in lost log output when capturing
// the output under /run/docker/plugins.
w := os.Stderr
var buf bytes.Buffer
bufLock := sync.Mutex{}
fire := time.AfterFunc(time.Duration(math.MaxInt64), func() {
bufLock.Lock()
if buf.Len() > 0 {
_, _ = w.Write(buf.Bytes())
buf.Reset()
}
bufLock.Unlock()
})

go func() {
for {
select {
case msg := <-msgChan:
bufLock.Lock()
buf.WriteString(msg)
buf.WriteByte('\n')
bufLock.Unlock()
fire.Reset(50 * time.Millisecond)
case <-doneCh:
fire.Stop()
// Output any remaining logs.
// No need for locks now. The timer is dead.
for len(msgChan) > 0 {
buf.WriteString(<-msgChan)
buf.WriteByte('\n')
}
if buf.Len() > 0 {
_, _ = w.Write(buf.Bytes())
}
return
}
}
}()
}

func Error(v any) {
switch v := v.(type) {
case nil:
case error:
_, _ = fmt.Fprintln(os.Stderr, v.Error())
case string:
_, _ = fmt.Fprintln(os.Stderr, v)
case fmt.Stringer:
_, _ = fmt.Fprintln(os.Stderr, v)
default:
err := fmt.Errorf("%v", v)
_, _ = fmt.Fprintln(os.Stderr, err.Error())
type logWriter string

func (w logWriter) Write(p []byte) (n int, err error) {
ps := string(p)
if len(ps) > 0 {
for _, line := range strings.Split(string(p), "\n") {
if len(line) > 0 {
tsPrintln(string(w), line)
}
}
}
return len(p), nil
}

func Stdlog(level string) io.Writer {
return logWriter(level)
}

func SetDebug(flag bool) {
debug = flag
}

func IsDebug() bool {
return debug
}

func Errorf(format string, args ...any) {
Error(fmt.Errorf(format, args...))
func Fatal(args ...any) {
Error(args...)
os.Exit(1)
}

func Fatal(v any) {
Error(v)
os.Exit(1)
func Error(args ...any) {
tsPrintln("error", args...)
}

func Info(v any) {
_, _ = fmt.Fprintln(os.Stderr, v)
func Errorf(format string, args ...any) {
tsPrintf("error", format, args...)
}

func Info(args ...any) {
tsPrintln("info", args...)
}

func Infof(format string, args ...any) {
fprintfln(os.Stderr, format, args...)
tsPrintf("info", format, args...)
}

func Debug(v any) {
func Debug(args ...any) {
if debug {
Info(v)
tsPrintln("debug", args...)
}
}

func Debugf(format string, args ...any) {
if debug {
Infof(format, args...)
tsPrintf("debug", format, args...)
}
}

func fprintfln(w io.Writer, format string, args ...any) {
_, _ = fmt.Fprintf(w, format, args...)
_, _ = w.Write(nl)
func tsPrintf(level string, format string, args ...any) {
msgChan <- fmt.Sprintf("%s %-6s "+format, slices.Insert(args, 0, any(time.Now().Format("15:04:05.0000")), any(level))...)
}

func tsPrintln(level string, args ...any) {
msgChan <- fmt.Sprint(slices.Insert(args, 0, any(fmt.Sprintf("%s %-6s", time.Now().Format("15:04:05.0000"), level)))...)
}
6 changes: 5 additions & 1 deletion pkg/sftp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ func (d *driver) getRemoteMount(host string, port uint16) (*mount, error) {
if m, ok := d.remoteMounts[key]; ok {
return m, nil
}
m := newMount(filepath.Join(d.volumePath, host, ps), host, port)
m := newMount(filepath.Join(d.volumePath, host, ps), host, port, func() {
d.lock.Lock()
delete(d.remoteMounts, key)
d.lock.Unlock()
})
if err := m.mountVolume(); err != nil {
return nil, err
}
Expand Down
33 changes: 19 additions & 14 deletions pkg/sftp/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync/atomic"
Expand All @@ -18,6 +19,7 @@ import (

// mount is shared between volumeMounts.
type mount struct {
cancel context.CancelFunc
mountPoint string
host string
port uint16
Expand All @@ -27,12 +29,13 @@ type mount struct {
proc *os.Process
}

func newMount(mountPoint, host string, port uint16) *mount {
func newMount(mountPoint, host string, port uint16, cancel context.CancelFunc) *mount {
return &mount{
mountPoint: mountPoint,
host: host,
port: port,
volumes: make(map[string]*volumeDir),
cancel: cancel,
}
}

Expand Down Expand Up @@ -96,15 +99,22 @@ func (m *mount) mountVolume() error {

// mount directives
"-o", "follow_symlinks",
"-o", "auto_unmount",
"-o", "allow_root", // needed to make --docker-run work as docker runs as root
}

var sl io.Writer
if log.IsDebug() {
sshfsArgs = append(sshfsArgs, "-d")
sl = log.Stdlog("debug")
} else {
sl = log.Stdlog("info")
}
exe := "sshfs"
cmd := exec.Command(exe, sshfsArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
log.Debug(cmd)
cmd.Stdout = sl
cmd.Stderr = sl

ctx := context.Background()

Expand All @@ -115,11 +125,12 @@ func (m *mount) mountVolume() error {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

log.Debugf("mounting %s", m)
m.mounted.Store(true)
if err := cmd.Start(); err != nil {
m.mounted.Store(false)
return fmt.Errorf("failed to start sshfs to mount %s: %w", m.mountPoint, err)
}
m.proc = cmd.Process
m.mounted.Store(true)

m.done = make(chan error, 2)
starting := atomic.Bool{}
Expand All @@ -139,6 +150,8 @@ func (m *mount) mountVolume() error {
func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) {
defer close(m.done)
err := cmd.Wait()
m.cancel()
m.mounted.Store(false)
if err != nil {
var ex *exec.ExitError
if errors.As(err, &ex) {
Expand All @@ -152,21 +165,11 @@ func (m *mount) sshfsWait(cmd *exec.Cmd, starting *atomic.Bool) {
}

// Restore to unmounted state
m.mounted.Store(false)
if starting.Swap(false) {
if err != nil {
m.done <- err
}
}
m.mounted.Store(false)

// sshfs sometimes leave the mount point in a bad state. This will clean it up
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
go func() {
defer cancel()
_ = exec.Command("fusermount", "-uz", m.mountPoint).Run()
}()
<-ctx.Done()
}

func (m *mount) detectSshfsStarted(ctx context.Context, st *unix.Stat_t) error {
Expand Down Expand Up @@ -238,6 +241,8 @@ func (m *mount) unmountVolume() (err error) {
log.Debug("forcing sshfs to stop")
_ = m.proc.Kill()
}
} else {
log.Debugf("sshfs on %s is not running", m.mountPoint)
}
return err
}
Expand Down