Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only allow one k0sctl to run simultaneously per host #382

Merged
merged 5 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ var applyCommand = &cli.Command{
phase.NoWait = ctx.Bool("no-wait")

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}
lockPhase := &phase.Lock{}

manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.PrepareHosts{},
&phase.GatherFacts{},
&phase.DownloadBinaries{},
Expand All @@ -75,19 +77,22 @@ var applyCommand = &cli.Command{
NoDrain: ctx.Bool("no-drain"),
},
&phase.RunHooks{Stage: "after", Action: "apply"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

analytics.Client.Publish("apply-start", map[string]interface{}{})

if err := manager.Run(); err != nil {
var result error

if result = manager.Run(); result != nil {
analytics.Client.Publish("apply-failure", map[string]interface{}{"clusterID": manager.Config.Spec.K0s.Metadata.ClusterID})
if lf, err := LogFile(); err == nil {
if ln, ok := lf.(interface{ Name() string }); ok {
log.Errorf("apply failed - log file saved to %s", ln.Name())
}
}
return err
return result
}

analytics.Client.Publish("apply-success", map[string]interface{}{"duration": time.Since(start), "clusterID": manager.Config.Spec.K0s.Metadata.ClusterID})
Expand Down
4 changes: 4 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ var backupCommand = &cli.Command{
start := time.Now()

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}
lockPhase := &phase.Lock{}

manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.GatherFacts{},
&phase.GatherK0sFacts{},
&phase.RunHooks{Stage: "before", Action: "backup"},
&phase.Backup{},
&phase.RunHooks{Stage: "after", Action: "backup"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

Expand Down
3 changes: 3 additions & 0 deletions cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ var resetCommand = &cli.Command{

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}

lockPhase := &phase.Lock{}
manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.PrepareHosts{},
&phase.GatherK0sFacts{},
&phase.RunHooks{Stage: "before", Action: "reset"},
&phase.Reset{},
&phase.RunHooks{Stage: "after", Action: "reset"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

Expand Down
36 changes: 36 additions & 0 deletions configurer/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func (l Linux) K0sJoinTokenPath() string {
return "/etc/k0s/k0stoken"
}

// K0sctlLockFilePath returns a path to a lock file
func (l Linux) K0sctlLockFilePath(h os.Host) string {
if h.Exec("test -d /run/lock", exec.Sudo(h)) == nil {
return "/run/lock/k0sctl"
}

return "/tmp/k0sctl.lock"
}

// TempFile returns a temp file path
func (l Linux) TempFile(h os.Host) (string, error) {
return h.ExecOutput("mktemp")
Expand Down Expand Up @@ -206,3 +215,30 @@ func (l Linux) PrivateAddress(h os.Host, iface, publicip string) (string, error)

return "", fmt.Errorf("not found")
}

// UpsertFile creates a file in path with content only if the file does not exist already
func (l Linux) UpsertFile(h os.Host, path, content string) error {
tmpf, err := l.TempFile(h)
if err != nil {
return err
}
if err := h.Execf(`cat > "%s"`, tmpf, exec.Stdin(content), exec.Sudo(h)); err != nil {
return err
}

defer func() {
_ = h.Execf(`rm -f "%s"`, tmpf, exec.Sudo(h))
}()

// mv -n is atomic
if err := h.Execf(`mv -n "%s" "%s"`, tmpf, path, exec.Sudo(h)); err != nil {
return fmt.Errorf("upsert failed: %w", err)
}

// if original tempfile still exists, error out
if h.Execf(`test -f "%s"`, tmpf) == nil {
return fmt.Errorf("upsert failed")
}

return nil
}
128 changes: 128 additions & 0 deletions phase/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package phase

import (
"context"
"fmt"
gos "os"
"sync"
"time"

retry "github.com/avast/retry-go"
"github.com/k0sproject/k0sctl/analytics"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"
log "github.com/sirupsen/logrus"
)

// Lock acquires an exclusive k0sctl lock on hosts
type Lock struct {
GenericPhase
cfs []func()
instanceID string
m sync.Mutex
wg sync.WaitGroup
}

// Prepare the phase
func (p *Lock) Prepare(c *v1beta1.Cluster) error {
p.Config = c
mid, _ := analytics.MachineID()
p.instanceID = fmt.Sprintf("%s-%d", mid, gos.Getpid())
return nil
}

// Title for the phase
func (p *Lock) Title() string {
return "Acquire exclusive host lock"
}

func (p *Lock) Cancel() {
p.m.Lock()
defer p.m.Unlock()
for _, f := range p.cfs {
f()
}
p.wg.Wait()
}

// Run the phase
func (p *Lock) Run() error {
if err := p.Config.Spec.Hosts.ParallelEach(p.startLock); err != nil {
return err
}
return p.Config.Spec.Hosts.ParallelEach(p.startTicker)
}

func (p *Lock) startTicker(h *cluster.Host) error {
p.wg.Add(1)
lfp := h.Configurer.K0sctlLockFilePath(h)
ticker := time.NewTicker(10 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
p.m.Lock()
p.cfs = append(p.cfs, cancel)
p.m.Unlock()

go func() {
log.Debugf("%s: started periodic update of lock file %s timestamp", h, lfp)
for {
select {
case <-ticker.C:
if err := h.Configurer.Touch(h, lfp, time.Now(), exec.Sudo(h)); err != nil {
log.Warnf("%s: failed to touch lock file: %s", h, err)
}
case <-ctx.Done():
log.Debugf("%s: stopped lock cycle, removing file", h)
if err := h.Configurer.DeleteFile(h, lfp); err != nil {
log.Warnf("%s: failed to remove host lock file: %s", h, err)
}
p.wg.Done()
return
}
}
}()

return nil
}

func (p *Lock) startLock(h *cluster.Host) error {
return retry.Do(
func() error {
return p.tryLock(h)
},
retry.OnRetry(
func(n uint, err error) {
log.Errorf("%s: attempt %d of %d.. trying to obtain a lock on host: %s", h, n+1, retries, err.Error())
},
),
retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay)),
retry.MaxJitter(time.Second*2),
retry.Delay(time.Second*3),
retry.Attempts(5),
retry.LastErrorOnly(true),
)
}

func (p *Lock) tryLock(h *cluster.Host) error {
lfp := h.Configurer.K0sctlLockFilePath(h)

if err := h.Configurer.UpsertFile(h, lfp, p.instanceID); err != nil {
stat, err := h.Configurer.Stat(h, lfp, exec.Sudo(h))
if err != nil {
return fmt.Errorf("lock file disappeared: %w", err)
}
content, err := h.Configurer.ReadFile(h, lfp)
if err != nil {
return fmt.Errorf("failed to read lock file: %w", err)
}
if content != p.instanceID {
if time.Since(stat.ModTime()) < 30*time.Second {
return fmt.Errorf("another instance of k0sctl is currently operating on the host")
}
_ = h.Configurer.DeleteFile(h, lfp)
return fmt.Errorf("removed existing expired lock file")
}
}

return nil
}
5 changes: 5 additions & 0 deletions phase/prepare_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// PrepareHosts installs required packages and so on on the hosts.
type PrepareHosts struct {
GenericPhase
cancel func()
}

// Title for the phase
Expand All @@ -27,6 +28,10 @@ type prepare interface {
Prepare(os.Host) error
}

func (p *PrepareHosts) CleanUp() {
p.cancel()
}

func (p *PrepareHosts) prepareHost(h *cluster.Host) error {
if c, ok := h.Configurer.(prepare); ok {
if err := c.Prepare(h); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions phase/unlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package phase

import (
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
log "github.com/sirupsen/logrus"
)

// Unlock acquires an exclusive k0sctl lock on hosts
type Unlock struct {
GenericPhase
Cancel func()
}

// Prepare the phase
func (p *Unlock) Prepare(c *v1beta1.Cluster) error {
p.Config = c
if p.Cancel == nil {
p.Cancel = func() {
log.Fatalf("cancel function not defined")
}
}
return nil
}

// Title for the phase
func (p *Unlock) Title() string {
return "Release exclusive host lock"
}

// Run the phase
func (p *Unlock) Run() error {
p.Cancel()
return nil
}
2 changes: 2 additions & 0 deletions pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type configurer interface {
CleanupServiceEnvironment(os.Host, string) error
Stat(os.Host, string, ...exec.Option) (*os.FileInfo, error)
Touch(os.Host, string, time.Time, ...exec.Option) error
K0sctlLockFilePath(os.Host) string
UpsertFile(os.Host, string, string) error
}

// HostMetadata resolved metadata for host
Expand Down