Skip to content

Commit

Permalink
[Ingest Manager] Download snapshot artifacts from snapshots repo (ela…
Browse files Browse the repository at this point in the history
…stic#18685)

[Ingest Manager] Download snapshot artifacts from snapshots repo (elastic#18685)
  • Loading branch information
michalpristas committed Jun 12, 2020
1 parent 38aec1c commit baaa581
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 6 deletions.
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
return nil, err
}

fetcher := downloader.NewDownloader(operatorConfig.DownloadConfig)
verifier, err := downloader.NewVerifier(operatorConfig.DownloadConfig)
fetcher := downloader.NewDownloader(log, operatorConfig.DownloadConfig)
verifier, err := downloader.NewVerifier(log, operatorConfig.DownloadConfig)
if err != nil {
return nil, errors.New(err, "initiating verifier")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,27 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(config *artifact.Config, downloaders ...download.Downloader) download.Downloader {
return composed.NewDownloader(fs.NewDownloader(config), http.NewDownloader(config))
func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader {
downloaders := make([]download.Downloader, 0, 3)
downloaders = append(downloaders, fs.NewDownloader(config))

// try snapshot repo before official
if release.Snapshot() {
snapDownloader, err := snapshot.NewDownloader(config)
if err != nil {
log.Error(err)
} else {
downloaders = append(downloaders, snapDownloader)
}
}

downloaders = append(downloaders, http.NewDownloader(config))
return composed.NewDownloader(downloaders...)
}
22 changes: 20 additions & 2 deletions x-pack/elastic-agent/pkg/artifact/download/localremote/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,37 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(config *artifact.Config, downloaders ...download.Downloader) (download.Verifier, error) {
func NewVerifier(log *logger.Logger, config *artifact.Config) (download.Verifier, error) {
verifiers := make([]download.Verifier, 0, 3)

fsVer, err := fs.NewVerifier(config)
if err != nil {
return nil, err
}
verifiers = append(verifiers, fsVer)

// try snapshot repo before official
if release.Snapshot() {
snapshotVerifier, err := snapshot.NewVerifier(config)
if err != nil {
log.Error(err)
} else {
verifiers = append(verifiers, snapshotVerifier)
}
}

remoteVer, err := http.NewVerifier(config)
if err != nil {
return nil, err
}
verifiers = append(verifiers, remoteVer)

return composed.NewVerifier(fsVer, remoteVer), nil
return composed.NewVerifier(verifiers...), nil
}
93 changes: 93 additions & 0 deletions x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package snapshot

import (
"encoding/json"
"fmt"
gohttp "net/http"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(config *artifact.Config) (download.Downloader, error) {
cfg, err := snapshotConfig(config)
if err != nil {
return nil, err
}
return http.NewDownloader(cfg), nil
}

func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
snapshotURI, err := snapshotURI()
if err != nil {
return nil, fmt.Errorf("failed to detect remote snapshot repo, proceeding with configured: %v", err)
}

return &artifact.Config{
OperatingSystem: config.OperatingSystem,
Architecture: config.Architecture,
BeatsSourceURI: snapshotURI,
TargetDirectory: config.TargetDirectory,
Timeout: config.Timeout,
PgpFile: config.PgpFile,
InstallPath: config.InstallPath,
DropPath: config.DropPath,
}, nil
}

func snapshotURI() (string, error) {
artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", release.Version())
resp, err := gohttp.Get(artifactsURI)
if err != nil {
return "", err
}
defer resp.Body.Close()

body := struct {
Packages map[string]interface{} `json:"packages"`
}{}

dec := json.NewDecoder(resp.Body)
if err := dec.Decode(&body); err != nil {
return "", err
}

if len(body.Packages) == 0 {
return "", fmt.Errorf("no packages found in snapshot repo")
}

for k, pkg := range body.Packages {
pkgMap, ok := pkg.(map[string]interface{})
if !ok {
return "", fmt.Errorf("content of '%s' is not a map", k)
}

uriVal, found := pkgMap["url"]
if !found {
return "", fmt.Errorf("item '%s' does not contain url", k)
}

uri, ok := uriVal.(string)
if !ok {
return "", fmt.Errorf("uri is not a string")
}

index := strings.Index(uri, "/elastic-agent/")
if index == -1 {
return "", fmt.Errorf("not an agent uri: '%s'", uri)
}

return uri[:index], nil
}

return "", fmt.Errorf("uri not detected")
}
21 changes: 21 additions & 0 deletions x-pack/elastic-agent/pkg/artifact/download/snapshot/verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package snapshot

import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
)

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(config *artifact.Config, downloaders ...download.Downloader) (download.Verifier, error) {
cfg, err := snapshotConfig(config)
if err != nil {
return nil, err
}
return http.NewVerifier(cfg)
}

0 comments on commit baaa581

Please sign in to comment.