Skip to content

Commit

Permalink
[Ingest Manager] Retryable downloads of beats (#19102) (#19156)
Browse files Browse the repository at this point in the history
[Ingest Manager] Retryable downloads of beats (#19102)
  • Loading branch information
michalpristas authored Jun 12, 2020
1 parent a3b8a4a commit eabb944
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 15 deletions.
95 changes: 95 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package operation

import (
"context"
"fmt"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
)

// retryableOperations consists of multiple operations which are
// retryable as a whole.
// if nth operation fails all preceding are retried as well
type retryableOperations struct {
logger *logger.Logger
operations []operation
retryConfig *retry.Config
}

func newRetryableOperations(
logger *logger.Logger,
retryConfig *retry.Config,
operations ...operation) *retryableOperations {

return &retryableOperations{
logger: logger,
retryConfig: retryConfig,
operations: operations,
}
}

// Name is human readable name identifying an operation
func (o *retryableOperations) Name() string {
names := make([]string, 0, len(o.operations))
for _, op := range o.operations {
names = append(names, op.Name())
}
return fmt.Sprintf("retryable block: %s", strings.Join(names, " "))
}

// Check checks whether operation needs to be run
// examples:
// - Start does not need to run if process is running
// - Fetch does not need to run if package is already present
func (o *retryableOperations) Check(application Application) (bool, error) {
for _, op := range o.operations {
// finish early if at least one operation needs to be run or errored out
if run, err := op.Check(application); err != nil || run {
return run, err
}
}

return false, nil
}

// Run runs the operation
func (o *retryableOperations) Run(ctx context.Context, application Application) (err error) {
return retry.Do(ctx, o.retryConfig, o.runOnce(application))
}

// Run runs the operation
func (o *retryableOperations) runOnce(application Application) func(context.Context) error {
return func(ctx context.Context) error {
for _, op := range o.operations {
if ctx.Err() != nil {
return ctx.Err()
}

shouldRun, err := op.Check(application)
if err != nil {
return err
}

if !shouldRun {
continue
}

o.logger.Debugf("running operation '%s' of the block '%s'", op.Name(), o.Name())
if err := op.Run(ctx, application); err != nil {
o.logger.Errorf("operation %s failed", op.Name())
return err
}
}

return nil
}
}

// check interface
var _ operation = &retryableOperations{}
8 changes: 6 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,12 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error {
// specific configuration of new process is passed
func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) {
flow := []operation{
newOperationFetch(o.logger, p, o.config, o.downloader, o.eventProcessor),
newOperationVerify(p, o.config, o.verifier, o.eventProcessor),
newRetryableOperations(
o.logger,
o.config.RetryConfig,
newOperationFetch(o.logger, p, o.config, o.downloader, o.eventProcessor),
newOperationVerify(p, o.config, o.verifier, o.eventProcessor),
),
newOperationInstall(o.logger, p, o.config, o.installer, o.eventProcessor),
newOperationStart(o.logger, p, o.config, cfg, o.eventProcessor),
newOperationConfig(o.logger, o.config, cfg, o.eventProcessor),
Expand Down
24 changes: 16 additions & 8 deletions x-pack/elastic-agent/pkg/artifact/download/fs/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ func NewDownloader(config *artifact.Config) *Downloader {

// Download fetches the package from configured source.
// Returns absolute path to downloaded package and an error.
func (e *Downloader) Download(_ context.Context, programName, version string) (string, error) {
// create a destination directory root/program
destinationDir := filepath.Join(e.config.TargetDirectory, programName)
if err := os.MkdirAll(destinationDir, 0755); err != nil {
return "", errors.New(err, "creating directory for downloaded artifact failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, destinationDir))
}
func (e *Downloader) Download(_ context.Context, programName, version string) (_ string, err error) {
downloadedFiles := make([]string, 0, 2)
defer func() {
if err != nil {
for _, path := range downloadedFiles {
os.Remove(path)
}
}
}()

// download from source to dest
path, err := e.download(e.config.OS(), programName, version)
downloadedFiles = append(downloadedFiles, path)
if err != nil {
os.Remove(path)
return "", err
}

_, err = e.downloadHash(e.config.OS(), programName, version)
hashPath, err := e.downloadHash(e.config.OS(), programName, version)
downloadedFiles = append(downloadedFiles, hashPath)
return path, err
}

Expand Down Expand Up @@ -103,6 +107,10 @@ func (e *Downloader) downloadFile(filename, fullPath string) (string, error) {
defer destinationFile.Close()

_, err = io.Copy(destinationFile, sourceFile)
if err != nil {
return "", err
}

return fullPath, nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
sample
content
of
a
file
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9af9aa016f3349aa248034629e4336ca2f4d31317bfb8c9a23a9d924c18969cf43ad93727e784da010a272690b2b5ce4c4ded3a5d2039e4408e93e1e18d113db beat-8.0.0-darwin-x86_64.tar.gz
9 changes: 8 additions & 1 deletion x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ func (v *Verifier) Verify(programName, version string) (bool, error) {

fullPath := filepath.Join(v.config.TargetDirectory, filename)

return v.verifyHash(filename, fullPath)
isMatch, err := v.verifyHash(filename, fullPath)
if !isMatch || err != nil {
// remove bits so they can be redownloaded
os.Remove(fullPath)
os.Remove(fullPath + ".sha512")
}

return isMatch, err
}

func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {
Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
)

Expand All @@ -28,6 +30,101 @@ type testCase struct {
arch string
}

func TestFetchVerify(t *testing.T) {
timeout := 15 * time.Second
dropPath := filepath.Join("testdata", "drop")
installPath := filepath.Join("testdata", "install")
targetPath := filepath.Join("testdata", "download")
ctx := context.Background()
programName := "beat"
version := "8.0.0"

targetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz")
hashTargetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512")

// cleanup
defer os.RemoveAll(targetPath)

config := &artifact.Config{
TargetDirectory: targetPath,
DropPath: dropPath,
InstallPath: installPath,
Timeout: timeout,
OperatingSystem: "darwin",
Architecture: "32",
}

err := prepareFetchVerifyTests(dropPath, targetPath, targetFilePath, hashTargetFilePath)
assert.NoError(t, err)

downloader := NewDownloader(config)
verifier, err := NewVerifier(config)
assert.NoError(t, err)

// first download verify should fail:
// download skipped, as invalid package is prepared upfront
// verify fails and cleans download
matches, err := verifier.Verify(programName, version)
assert.NoError(t, err)
assert.Equal(t, false, matches)

_, err = os.Stat(targetFilePath)
assert.True(t, os.IsNotExist(err))

_, err = os.Stat(hashTargetFilePath)
assert.True(t, os.IsNotExist(err))

// second one should pass
// download not skipped: package missing
// verify passes because hash is not correct
_, err = downloader.Download(ctx, programName, version)
assert.NoError(t, err)

// file downloaded ok
_, err = os.Stat(targetFilePath)
assert.NoError(t, err)

_, err = os.Stat(hashTargetFilePath)
assert.NoError(t, err)

matches, err = verifier.Verify(programName, version)
assert.NoError(t, err)
assert.Equal(t, true, matches)
}

func prepareFetchVerifyTests(dropPath, targetDir, targetFilePath, hashTargetFilePath string) error {
sourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz")
hashSourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512")

// clean targets
os.Remove(targetFilePath)
os.Remove(hashTargetFilePath)

if err := os.MkdirAll(targetDir, 0775); err != nil {
return err
}

sourceFile, err := os.Open(sourceFilePath)
if err != nil {
return err
}
defer sourceFile.Close()

targretFile, err := os.OpenFile(targetFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
return err
}
defer targretFile.Close()

hashContent, err := ioutil.ReadFile(hashSourceFilePath)
if err != nil {
return err
}

corruptedHash := append([]byte{1, 2, 3, 4, 5, 6}, hashContent[6:]...)
return ioutil.WriteFile(hashTargetFilePath, corruptedHash, 0666)
}

func TestVerify(t *testing.T) {
targetDir, err := ioutil.TempDir(os.TempDir(), "")
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions x-pack/elastic-agent/pkg/artifact/download/http/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,25 @@ func NewDownloaderWithClient(config *artifact.Config, client http.Client) *Downl

// Download fetches the package from configured source.
// Returns absolute path to downloaded package and an error.
func (e *Downloader) Download(ctx context.Context, programName, version string) (string, error) {
func (e *Downloader) Download(ctx context.Context, programName, version string) (_ string, err error) {
downloadedFiles := make([]string, 0, 2)
defer func() {
if err != nil {
for _, path := range downloadedFiles {
os.Remove(path)
}
}
}()

// download from source to dest
path, err := e.download(ctx, e.config.OS(), programName, version)
downloadedFiles = append(downloadedFiles, path)
if err != nil {
os.Remove(path)
return "", err
}

_, err = e.downloadHash(ctx, e.config.OS(), programName, version)
hashPath, err := e.downloadHash(ctx, e.config.OS(), programName, version)
downloadedFiles = append(downloadedFiles, hashPath)
return path, err
}

Expand Down
9 changes: 8 additions & 1 deletion x-pack/elastic-agent/pkg/artifact/download/http/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,14 @@ func (v *Verifier) Verify(programName, version string) (bool, error) {
return false, errors.New(err, "retrieving package path")
}

return v.verifyHash(filename, fullPath)
isMatch, err := v.verifyHash(filename, fullPath)
if !isMatch || err != nil {
// remove bits so they can be redownloaded
os.Remove(fullPath)
os.Remove(fullPath + ".sha512")
}

return isMatch, err
}

func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {
Expand Down

0 comments on commit eabb944

Please sign in to comment.