diff --git a/cmd/apply.go b/cmd/apply.go index cfa1fd9d..fcf3a7a8 100644 --- a/cmd/apply.go +++ b/cmd/apply.go @@ -48,10 +48,12 @@ var applyCommand = &cli.Command{ phase.NoWait = ctx.Bool("no-wait") manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)} + lockPhase := &phase.Lock{} manager.AddPhase( &phase.Connect{}, &phase.DetectOS{}, + lockPhase, &phase.PrepareHosts{}, &phase.GatherFacts{}, &phase.DownloadBinaries{}, @@ -75,19 +77,22 @@ var applyCommand = &cli.Command{ NoDrain: ctx.Bool("no-drain"), }, &phase.RunHooks{Stage: "after", Action: "apply"}, + &phase.Unlock{Cancel: lockPhase.Cancel}, &phase.Disconnect{}, ) analytics.Client.Publish("apply-start", map[string]interface{}{}) - if err := manager.Run(); err != nil { + var result error + + if result = manager.Run(); result != nil { analytics.Client.Publish("apply-failure", map[string]interface{}{"clusterID": manager.Config.Spec.K0s.Metadata.ClusterID}) if lf, err := LogFile(); err == nil { if ln, ok := lf.(interface{ Name() string }); ok { log.Errorf("apply failed - log file saved to %s", ln.Name()) } } - return err + return result } analytics.Client.Publish("apply-success", map[string]interface{}{"duration": time.Since(start), "clusterID": manager.Config.Spec.K0s.Metadata.ClusterID}) diff --git a/cmd/backup.go b/cmd/backup.go index d56487ac..28b27745 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -28,14 +28,18 @@ var backupCommand = &cli.Command{ start := time.Now() manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)} + lockPhase := &phase.Lock{} + manager.AddPhase( &phase.Connect{}, &phase.DetectOS{}, + lockPhase, &phase.GatherFacts{}, &phase.GatherK0sFacts{}, &phase.RunHooks{Stage: "before", Action: "backup"}, &phase.Backup{}, &phase.RunHooks{Stage: "after", Action: "backup"}, + &phase.Unlock{Cancel: lockPhase.Cancel}, &phase.Disconnect{}, ) diff --git a/cmd/reset.go b/cmd/reset.go index f0f7f68b..565f809e 100644 --- a/cmd/reset.go +++ b/cmd/reset.go @@ -52,14 +52,17 @@ var resetCommand = &cli.Command{ manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)} + lockPhase := &phase.Lock{} manager.AddPhase( &phase.Connect{}, &phase.DetectOS{}, + lockPhase, &phase.PrepareHosts{}, &phase.GatherK0sFacts{}, &phase.RunHooks{Stage: "before", Action: "reset"}, &phase.Reset{}, &phase.RunHooks{Stage: "after", Action: "reset"}, + &phase.Unlock{Cancel: lockPhase.Cancel}, &phase.Disconnect{}, ) diff --git a/configurer/linux.go b/configurer/linux.go index 1a4f83d2..fe689086 100644 --- a/configurer/linux.go +++ b/configurer/linux.go @@ -87,6 +87,15 @@ func (l Linux) K0sJoinTokenPath() string { return "/etc/k0s/k0stoken" } +// K0sctlLockFilePath returns a path to a lock file +func (l Linux) K0sctlLockFilePath(h os.Host) string { + if h.Exec("test -d /run/lock", exec.Sudo(h)) == nil { + return "/run/lock/k0sctl" + } + + return "/tmp/k0sctl.lock" +} + // TempFile returns a temp file path func (l Linux) TempFile(h os.Host) (string, error) { return h.ExecOutput("mktemp") @@ -206,3 +215,30 @@ func (l Linux) PrivateAddress(h os.Host, iface, publicip string) (string, error) return "", fmt.Errorf("not found") } + +// UpsertFile creates a file in path with content only if the file does not exist already +func (l Linux) UpsertFile(h os.Host, path, content string) error { + tmpf, err := l.TempFile(h) + if err != nil { + return err + } + if err := h.Execf(`cat > "%s"`, tmpf, exec.Stdin(content), exec.Sudo(h)); err != nil { + return err + } + + defer func() { + _ = h.Execf(`rm -f "%s"`, tmpf, exec.Sudo(h)) + }() + + // mv -n is atomic + if err := h.Execf(`mv -n "%s" "%s"`, tmpf, path, exec.Sudo(h)); err != nil { + return fmt.Errorf("upsert failed: %w", err) + } + + // if original tempfile still exists, error out + if h.Execf(`test -f "%s"`, tmpf) == nil { + return fmt.Errorf("upsert failed") + } + + return nil +} diff --git a/phase/lock.go b/phase/lock.go new file mode 100644 index 00000000..f8646e7a --- /dev/null +++ b/phase/lock.go @@ -0,0 +1,128 @@ +package phase + +import ( + "context" + "fmt" + gos "os" + "sync" + "time" + + retry "github.com/avast/retry-go" + "github.com/k0sproject/k0sctl/analytics" + "github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1" + "github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster" + "github.com/k0sproject/rig/exec" + log "github.com/sirupsen/logrus" +) + +// Lock acquires an exclusive k0sctl lock on hosts +type Lock struct { + GenericPhase + cfs []func() + instanceID string + m sync.Mutex + wg sync.WaitGroup +} + +// Prepare the phase +func (p *Lock) Prepare(c *v1beta1.Cluster) error { + p.Config = c + mid, _ := analytics.MachineID() + p.instanceID = fmt.Sprintf("%s-%d", mid, gos.Getpid()) + return nil +} + +// Title for the phase +func (p *Lock) Title() string { + return "Acquire exclusive host lock" +} + +func (p *Lock) Cancel() { + p.m.Lock() + defer p.m.Unlock() + for _, f := range p.cfs { + f() + } + p.wg.Wait() +} + +// Run the phase +func (p *Lock) Run() error { + if err := p.Config.Spec.Hosts.ParallelEach(p.startLock); err != nil { + return err + } + return p.Config.Spec.Hosts.ParallelEach(p.startTicker) +} + +func (p *Lock) startTicker(h *cluster.Host) error { + p.wg.Add(1) + lfp := h.Configurer.K0sctlLockFilePath(h) + ticker := time.NewTicker(10 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + p.m.Lock() + p.cfs = append(p.cfs, cancel) + p.m.Unlock() + + go func() { + log.Debugf("%s: started periodic update of lock file %s timestamp", h, lfp) + for { + select { + case <-ticker.C: + if err := h.Configurer.Touch(h, lfp, time.Now(), exec.Sudo(h)); err != nil { + log.Warnf("%s: failed to touch lock file: %s", h, err) + } + case <-ctx.Done(): + log.Debugf("%s: stopped lock cycle, removing file", h) + if err := h.Configurer.DeleteFile(h, lfp); err != nil { + log.Warnf("%s: failed to remove host lock file: %s", h, err) + } + p.wg.Done() + return + } + } + }() + + return nil +} + +func (p *Lock) startLock(h *cluster.Host) error { + return retry.Do( + func() error { + return p.tryLock(h) + }, + retry.OnRetry( + func(n uint, err error) { + log.Errorf("%s: attempt %d of %d.. trying to obtain a lock on host: %s", h, n+1, retries, err.Error()) + }, + ), + retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay)), + retry.MaxJitter(time.Second*2), + retry.Delay(time.Second*3), + retry.Attempts(5), + retry.LastErrorOnly(true), + ) +} + +func (p *Lock) tryLock(h *cluster.Host) error { + lfp := h.Configurer.K0sctlLockFilePath(h) + + if err := h.Configurer.UpsertFile(h, lfp, p.instanceID); err != nil { + stat, err := h.Configurer.Stat(h, lfp, exec.Sudo(h)) + if err != nil { + return fmt.Errorf("lock file disappeared: %w", err) + } + content, err := h.Configurer.ReadFile(h, lfp) + if err != nil { + return fmt.Errorf("failed to read lock file: %w", err) + } + if content != p.instanceID { + if time.Since(stat.ModTime()) < 30*time.Second { + return fmt.Errorf("another instance of k0sctl is currently operating on the host") + } + _ = h.Configurer.DeleteFile(h, lfp) + return fmt.Errorf("removed existing expired lock file") + } + } + + return nil +} diff --git a/phase/prepare_hosts.go b/phase/prepare_hosts.go index d36d8b29..c9e1fad6 100644 --- a/phase/prepare_hosts.go +++ b/phase/prepare_hosts.go @@ -11,6 +11,7 @@ import ( // PrepareHosts installs required packages and so on on the hosts. type PrepareHosts struct { GenericPhase + cancel func() } // Title for the phase @@ -27,6 +28,10 @@ type prepare interface { Prepare(os.Host) error } +func (p *PrepareHosts) CleanUp() { + p.cancel() +} + func (p *PrepareHosts) prepareHost(h *cluster.Host) error { if c, ok := h.Configurer.(prepare); ok { if err := c.Prepare(h); err != nil { diff --git a/phase/unlock.go b/phase/unlock.go new file mode 100644 index 00000000..c6efa85f --- /dev/null +++ b/phase/unlock.go @@ -0,0 +1,34 @@ +package phase + +import ( + "github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1" + log "github.com/sirupsen/logrus" +) + +// Unlock acquires an exclusive k0sctl lock on hosts +type Unlock struct { + GenericPhase + Cancel func() +} + +// Prepare the phase +func (p *Unlock) Prepare(c *v1beta1.Cluster) error { + p.Config = c + if p.Cancel == nil { + p.Cancel = func() { + log.Fatalf("cancel function not defined") + } + } + return nil +} + +// Title for the phase +func (p *Unlock) Title() string { + return "Release exclusive host lock" +} + +// Run the phase +func (p *Unlock) Run() error { + p.Cancel() + return nil +} diff --git a/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go b/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go index 12ffbcfd..6d608e9c 100644 --- a/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go +++ b/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go @@ -117,6 +117,8 @@ type configurer interface { CleanupServiceEnvironment(os.Host, string) error Stat(os.Host, string, ...exec.Option) (*os.FileInfo, error) Touch(os.Host, string, time.Time, ...exec.Option) error + K0sctlLockFilePath(os.Host) string + UpsertFile(os.Host, string, string) error } // HostMetadata resolved metadata for host