From 1a9e7d5bfe84e3119e38238820b1c6d0cd5b3514 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 17:01:58 +0100 Subject: [PATCH 1/9] is that it? --- .../pkg/agent/operation/operator.go | 11 +++++++++++ .../install/atomic/atomic_installer.go | 10 ++++++++++ .../pkg/artifact/install/installer.go | 18 ++++++++++++++++-- .../pkg/artifact/install/zip/zip_installer.go | 11 ++++++++--- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index a95bfe5b165..a0416545b11 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -36,6 +36,10 @@ const ( isMonitoringLogsFlag = 1 << 1 ) +type waiter interface { + Wait() +} + // Operator runs Start/Stop/Update operations // it is responsible for detecting reconnect to existing processes // based on backed up configuration @@ -182,6 +186,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { // Shutdown handles shutting down the running apps for Agent shutdown. func (o *Operator) Shutdown() { + // wait for installer and downloader + if awaitable, ok := o.installer.(waiter); ok { + o.logger.Infof("waiting for installer of pipeline '%s' to finish", o.pipelineID) + awaitable.Wait() + o.logger.Debugf("pipeline installer '%s' done", o.pipelineID) + } + for _, app := range o.apps { app.Shutdown() } diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index f6c139ca463..1793ff30fb5 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" @@ -22,6 +23,7 @@ type embeddedInstaller interface { // successful finish. type Installer struct { installer embeddedInstaller + wg sync.WaitGroup } // NewInstaller creates a new AtomicInstaller @@ -31,8 +33,16 @@ func NewInstaller(i embeddedInstaller) (*Installer, error) { }, nil } +// Allows caller to wait for install to be finished +func (i *Installer) Wait() { + i.wg.Wait() +} + // Install performs installation of program in a specific version. func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { + i.wg.Add(1) + defer i.wg.Done() + // tar installer uses Dir of installDir to determine location of unpack tempDir, err := ioutil.TempDir(paths.TempDir(), "elastic-agent-install") if err != nil { diff --git a/x-pack/elastic-agent/pkg/artifact/install/installer.go b/x-pack/elastic-agent/pkg/artifact/install/installer.go index b99563ff997..a53cc46633b 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/installer.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/awaitable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar" @@ -39,12 +40,20 @@ type InstallerChecker interface { Check(ctx context.Context, spec program.Spec, version, installDir string) error } +// AwaitableInstallerChecker is an interface that installs, checks but also is awaitable to check when actions are done. +type AwaitableInstallerChecker interface { + InstallerChecker + + // Waits for its work to be done. + Wait() +} + // NewInstaller returns a correct installer associated with a // package type: // - rpm -> rpm installer // - deb -> deb installer // - binary -> zip installer on windows, tar installer on linux and mac -func NewInstaller(config *artifact.Config) (InstallerChecker, error) { +func NewInstaller(config *artifact.Config) (AwaitableInstallerChecker, error) { if config == nil { return nil, ErrConfigNotProvided } @@ -66,5 +75,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) { return nil, err } - return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) + hooksInstaller, err := hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) + if err != nil { + return nil, err + } + + return awaitable.NewInstaller(hooksInstaller, hooksInstaller) } diff --git a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go index fdd374eb72a..3073da25ca4 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go @@ -37,7 +37,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) { // Install performs installation of program in a specific version. // It expects package to be already downloaded. -func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error { +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory) if err != nil { return err @@ -49,7 +49,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta os.RemoveAll(installDir) } - if err := i.unzip(artifactPath); err != nil { + if err := i.unzip(ctx, artifactPath); err != nil { return err } @@ -69,7 +69,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta return nil } -func (i *Installer) unzip(artifactPath string) error { +func (i *Installer) unzip(ctx context.Context, artifactPath string) error { r, err := zip.OpenReader(artifactPath) if err != nil { return err @@ -120,6 +120,11 @@ func (i *Installer) unzip(artifactPath string) error { } for _, f := range r.File { + // if we were cancelled in between + if err := ctx.Err(); err != nil { + return err + } + if err := unpackFile(f); err != nil { return err } From 9c2ca6ccb501e91eabdf8c9f6dce2269fdbe0c9b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 18:09:29 +0100 Subject: [PATCH 2/9] a --- .../pkg/artifact/install/atomic/atomic_installer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index 1793ff30fb5..ef64f44a58f 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" @@ -71,5 +72,13 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins return err } + // on windows rename is not atomic so if we were in cancellation process let's start over + // after restart + if err := ctx.Err(); runtime.GOOS == "windows" && err != nil { + os.RemoveAll(installDir) + os.RemoveAll(tempInstallDir) + return err + } + return nil } From 8a3cff16bd304291fbb845d763cc96901e8ecdfb Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 18:28:31 +0100 Subject: [PATCH 3/9] files --- .../install/awaitable/awaitable_installer.go | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go diff --git a/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go b/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go new file mode 100644 index 00000000000..3525def1c7f --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awaitable + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" +) + +type embeddedInstaller interface { + Install(ctx context.Context, spec program.Spec, version, installDir string) error +} + +type embeddedChecker interface { + Check(ctx context.Context, spec program.Spec, version, installDir string) error +} + +// Installer installs into temporary destination and moves to correct one after +// successful finish. +type Installer struct { + installer embeddedInstaller + checker embeddedChecker + wg sync.WaitGroup +} + +// NewInstaller creates a new AtomicInstaller +func NewInstaller(i embeddedInstaller, ch embeddedChecker) (*Installer, error) { + return &Installer{ + installer: i, + checker: ch, + }, nil +} + +// Wait allows caller to wait for install to be finished +func (i *Installer) Wait() { + i.wg.Wait() +} + +// Install performs installation of program in a specific version. +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { + i.wg.Add(1) + defer i.wg.Done() + + return i.installer.Install(ctx, spec, version, installDir) +} + +// Check performs installation checks +func (i *Installer) Check(ctx context.Context, spec program.Spec, version, installDir string) error { + i.wg.Add(1) + defer i.wg.Done() + + return i.checker.Check(ctx, spec, version, installDir) +} From 2684d9cf207283c64c7a194fe9923d78d5cb5e76 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 19:32:13 +0100 Subject: [PATCH 4/9] tests --- .../install/atomic/atomic_installer.go | 45 ++++++++++++------- .../pkg/artifact/install/zip/zip_installer.go | 3 ++ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index ef64f44a58f..ae26ef80a35 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -10,10 +10,10 @@ import ( "os" "path/filepath" "runtime" - "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + "go.uber.org/multierr" ) type embeddedInstaller interface { @@ -24,7 +24,6 @@ type embeddedInstaller interface { // successful finish. type Installer struct { installer embeddedInstaller - wg sync.WaitGroup } // NewInstaller creates a new AtomicInstaller @@ -34,16 +33,8 @@ func NewInstaller(i embeddedInstaller) (*Installer, error) { }, nil } -// Allows caller to wait for install to be finished -func (i *Installer) Wait() { - i.wg.Wait() -} - // Install performs installation of program in a specific version. func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { - i.wg.Add(1) - defer i.wg.Done() - // tar installer uses Dir of installDir to determine location of unpack tempDir, err := ioutil.TempDir(paths.TempDir(), "elastic-agent-install") if err != nil { @@ -62,22 +53,42 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins if err := i.installer.Install(ctx, spec, version, tempInstallDir); err != nil { // cleanup unfinished install - os.RemoveAll(tempInstallDir) + if rerr := os.RemoveAll(tempInstallDir); rerr != nil { + err = multierr.Append(err, rerr) + } return err } if err := os.Rename(tempInstallDir, installDir); err != nil { - os.RemoveAll(installDir) - os.RemoveAll(tempInstallDir) + if rerr := os.RemoveAll(installDir); rerr != nil { + err = multierr.Append(err, rerr) + } + if rerr := os.RemoveAll(tempInstallDir); rerr != nil { + err = multierr.Append(err, rerr) + } return err } // on windows rename is not atomic so if we were in cancellation process let's start over // after restart - if err := ctx.Err(); runtime.GOOS == "windows" && err != nil { - os.RemoveAll(installDir) - os.RemoveAll(tempInstallDir) - return err + if runtime.GOOS == "windows" { + // sync + f, err := os.OpenFile(installDir, os.O_SYNC|os.O_RDWR, 0755) + if err == nil { + f.Sync() + } + + // remove + if err := ctx.Err(); err != nil { + if rerr := os.RemoveAll(installDir); rerr != nil { + err = multierr.Append(err, rerr) + } + if rerr := os.RemoveAll(tempInstallDir); rerr != nil { + err = multierr.Append(err, rerr) + } + return err + } + } return nil diff --git a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go index 3073da25ca4..6f0e158f364 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go @@ -64,6 +64,9 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins if err := os.Rename(rootDir, installDir); err != nil { return errors.New(err, errors.TypeFilesystem, errors.M(errors.MetaKeyPath, installDir)) } + + // sync dir + os.Open(installDir) } return nil From d09eb7b9ea2ecd7460d17a9faa30ad311c5ba423 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 20:12:40 +0100 Subject: [PATCH 5/9] polish --- .../install/atomic/atomic_installer.go | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index ae26ef80a35..046d88b81c8 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" - "go.uber.org/multierr" + "github.com/hashicorp/go-multierror" ) type embeddedInstaller interface { @@ -54,41 +54,26 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins if err := i.installer.Install(ctx, spec, version, tempInstallDir); err != nil { // cleanup unfinished install if rerr := os.RemoveAll(tempInstallDir); rerr != nil { - err = multierr.Append(err, rerr) + err = multierror.Append(err, rerr) } return err } if err := os.Rename(tempInstallDir, installDir); err != nil { if rerr := os.RemoveAll(installDir); rerr != nil { - err = multierr.Append(err, rerr) + err = multierror.Append(err, rerr) } if rerr := os.RemoveAll(tempInstallDir); rerr != nil { - err = multierr.Append(err, rerr) + err = multierror.Append(err, rerr) } return err } - // on windows rename is not atomic so if we were in cancellation process let's start over - // after restart + // on windows rename is not atomic, let's force it to flush the cache if runtime.GOOS == "windows" { - // sync - f, err := os.OpenFile(installDir, os.O_SYNC|os.O_RDWR, 0755) - if err == nil { + if f, err := os.OpenFile(installDir, os.O_SYNC|os.O_RDWR, 0755); err == nil { f.Sync() } - - // remove - if err := ctx.Err(); err != nil { - if rerr := os.RemoveAll(installDir); rerr != nil { - err = multierr.Append(err, rerr) - } - if rerr := os.RemoveAll(tempInstallDir); rerr != nil { - err = multierr.Append(err, rerr) - } - return err - } - } return nil From f2d31c9422b2868bf4b4add4d0284cc96f72046b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 20:13:43 +0100 Subject: [PATCH 6/9] polish --- x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go index 6f0e158f364..3073da25ca4 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go @@ -64,9 +64,6 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins if err := os.Rename(rootDir, installDir); err != nil { return errors.New(err, errors.TypeFilesystem, errors.M(errors.MetaKeyPath, installDir)) } - - // sync dir - os.Open(installDir) } return nil From e22fac2ba89f98b9f20f741fb477fb73d0ccb578 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 20:42:55 +0100 Subject: [PATCH 7/9] changelog --- x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 8f5efd70aa3..c0a0059e291 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -36,6 +36,7 @@ - Windows agent doesn't uninstall with a lowercase `c:` drive in the path {pull}23998[23998] - Fix reloading of log level for services {pull}[24055]24055 - Fix: Successfully installed and enrolled agent running standalone{pull}[24128]24128 +- Make installer atomic on windows {pull}[24253]24253 ==== New features From e69e68a2ab56fff841b6d3a8f8ea1f1b96732501 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 25 Feb 2021 22:58:53 +0100 Subject: [PATCH 8/9] fmt --- .../pkg/artifact/install/atomic/atomic_installer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index 046d88b81c8..980bb2127f9 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -11,9 +11,10 @@ import ( "path/filepath" "runtime" + "github.com/hashicorp/go-multierror" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" - "github.com/hashicorp/go-multierror" ) type embeddedInstaller interface { From f297533a1601ab81b3a6e0489d38f0f609b08716 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 2 Mar 2021 09:10:51 +0100 Subject: [PATCH 9/9] tar installer cancells as well --- .../pkg/artifact/install/tar/tar_installer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go index 2452f5909cc..421a30bed4f 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go @@ -33,7 +33,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) { // Install performs installation of program in a specific version. // It expects package to be already downloaded. -func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error { +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory) if err != nil { return err @@ -53,10 +53,10 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta // unpack must occur in directory that holds the installation directory // or the extraction will be double nested - return unpack(f, filepath.Dir(installDir)) + return unpack(ctx, f, filepath.Dir(installDir)) } -func unpack(r io.Reader, dir string) error { +func unpack(ctx context.Context, r io.Reader, dir string) error { zr, err := gzip.NewReader(r) if err != nil { return errors.New("requires gzip-compressed body", err, errors.TypeFilesystem) @@ -66,6 +66,11 @@ func unpack(r io.Reader, dir string) error { var rootDir string for { + // exit and propagate cancellation err as soon as we know about it + if err := ctx.Err(); err != nil { + return err + } + f, err := tr.Next() if err == io.EOF { break