Skip to content

Commit

Permalink
Fixed: source uri reload for download/verify components (#1252)
Browse files Browse the repository at this point in the history
Fixed: source uri reload for download/verify components (#1252)
  • Loading branch information
michalpristas authored Sep 26, 2022
1 parent 7af8092 commit 2b6cfdc
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
- Fix a panic caused by a race condition when installing the Elastic Agent. {issues}806[806]
- Use at least warning level for all status logs {pull}1218[1218]
- Remove fleet event reporter and events from checkin body. {issue}993[993]
- Fix unintended reset of source URI when downloading components {pull}1252[1252]

==== New features

Expand Down
39 changes: 39 additions & 0 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,52 @@ func (o *Operator) Reload(rawConfig *config.Config) error {
return errors.New(err, "failed to unpack artifact config")
}

sourceURI, err := reloadSourceURI(o.logger, rawConfig)
if err != nil {
return errors.New(err, "failed to parse source URI")
}
tmp.C.SourceURI = sourceURI

if err := o.reloadComponent(o.downloader, "downloader", tmp.C); err != nil {
return err
}

return o.reloadComponent(o.verifier, "verifier", tmp.C)
}

func reloadSourceURI(logger *logger.Logger, rawConfig *config.Config) (string, error) {
type reloadConfig struct {
// SourceURI: source of the artifacts, e.g https://artifacts.elastic.co/downloads/
SourceURI string `json:"agent.download.sourceURI" config:"agent.download.sourceURI"`

// FleetSourceURI: source of the artifacts, e.g https://artifacts.elastic.co/downloads/ coming from fleet which uses
// different naming.
FleetSourceURI string `json:"agent.download.source_uri" config:"agent.download.source_uri"`
}
cfg := &reloadConfig{}
if err := rawConfig.Unpack(&cfg); err != nil {
return "", errors.New(err, "failed to unpack config during reload")
}

var newSourceURI string
if fleetURI := strings.TrimSpace(cfg.FleetSourceURI); fleetURI != "" {
// fleet configuration takes precedence
newSourceURI = fleetURI
} else if sourceURI := strings.TrimSpace(cfg.SourceURI); sourceURI != "" {
newSourceURI = sourceURI
}

if newSourceURI != "" {
logger.Infof("Source URI in operator changed to %q", newSourceURI)
return newSourceURI, nil
}

// source uri unset, reset to default
logger.Infof("Source URI in reset %q", artifact.DefaultSourceURI)
return artifact.DefaultSourceURI, nil

}

func (o *Operator) reloadComponent(component interface{}, name string, cfg *artifact.Config) error {
r, ok := component.(artifact.ConfigReloader)
if !ok {
Expand Down
64 changes: 58 additions & 6 deletions internal/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
)

Expand Down Expand Up @@ -71,7 +74,7 @@ func TestConfigurableRun(t *testing.T) {
if err := operator.start(p, nil); err != nil {
t.Fatal(err)
}
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
defer func() { _ = operator.stop(p) }() // failure catch, to ensure no sub-process stays running

waitFor(t, func() error {
items := operator.State()
Expand All @@ -87,6 +90,7 @@ func TestConfigurableRun(t *testing.T) {

// try to configure
cfg := make(map[string]interface{})
//nolint:gosec // rand is ok for test
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
cfg["TestFile"] = tstFilePath
if err := operator.pushConfig(p, cfg); err != nil {
Expand Down Expand Up @@ -145,7 +149,7 @@ func TestConfigurableFailed(t *testing.T) {
if err := operator.start(p, nil); err != nil {
t.Fatal(err)
}
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
defer func() { _ = operator.stop(p) }() // failure catch, to ensure no sub-process stays running

var pid int
waitFor(t, func() error {
Expand All @@ -172,6 +176,7 @@ func TestConfigurableFailed(t *testing.T) {

// try to configure (with failed status)
cfg := make(map[string]interface{})
//nolint:gosec // rand is ok for test
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
cfg["TestFile"] = tstFilePath
cfg["Status"] = proto.StateObserved_FAILED
Expand Down Expand Up @@ -254,7 +259,7 @@ func TestConfigurableCrash(t *testing.T) {
if err := operator.start(p, nil); err != nil {
t.Fatal(err)
}
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
defer func() { _ = operator.stop(p) }() // failure catch, to ensure no sub-process stays running

var pid int
waitFor(t, func() error {
Expand All @@ -272,6 +277,7 @@ func TestConfigurableCrash(t *testing.T) {

// try to configure (with failed status)
cfg := make(map[string]interface{})
//nolint:gosec // rand is ok for test
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
cfg["TestFile"] = tstFilePath
cfg["Crash"] = true
Expand Down Expand Up @@ -352,7 +358,7 @@ func TestConfigurableStartStop(t *testing.T) {
p := getProgram("configurable", "1.0")

operator := getTestOperator(t, downloadPath, installPath, p)
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
defer func() { _ = operator.stop(p) }() // failure catch, to ensure no sub-process stays running

// start and stop it 3 times
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -396,11 +402,11 @@ func TestConfigurableService(t *testing.T) {
if err := operator.start(p, nil); err != nil {
t.Fatal(err)
}
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
defer func() { _ = operator.stop(p) }() // failure catch, to ensure no sub-process stays running

// emulating a service, so we need to start the binary here in the test
spec := p.ProcessSpec()
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort()))
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort())) //nolint:gosec,G204 // this is fine
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Dir = filepath.Dir(spec.BinaryPath)
cmd.Stdout = os.Stdout
Expand All @@ -423,6 +429,7 @@ func TestConfigurableService(t *testing.T) {

// try to configure
cfg := make(map[string]interface{})
//nolint:gosec // rand is ok for test
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
cfg["TestFile"] = tstFilePath
if err := operator.pushConfig(p, cfg); err != nil {
Expand Down Expand Up @@ -462,6 +469,51 @@ func TestConfigurableService(t *testing.T) {
}
}

func TestReloadSourceURI(t *testing.T) {
testCases := map[string]struct {
IncomingConfig map[string]interface{}
ExpectedSourceURI string
}{
"no-config": {
IncomingConfig: map[string]interface{}{},
ExpectedSourceURI: artifact.DefaultSourceURI,
},
"source-uri-provided": {
IncomingConfig: map[string]interface{}{
"agent.download.sourceURI": "http://source-uri",
},
ExpectedSourceURI: "http://source-uri",
},
"fleet-source-uri-provided": {
IncomingConfig: map[string]interface{}{
"agent.download.source_uri": "http://fleet-source-uri",
},
ExpectedSourceURI: "http://fleet-source-uri",
},
"both-source-uri-provided": {
IncomingConfig: map[string]interface{}{
"agent.download.sourceURI": "http://source-uri",
"agent.download.source_uri": "http://fleet-source-uri",
},
ExpectedSourceURI: "http://fleet-source-uri",
},
}

l := getLogger()
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
cfg, err := config.NewConfigFrom(tc.IncomingConfig)
require.NoError(t, err)
require.NotNil(t, cfg)

sourceUri, err := reloadSourceURI(l, cfg)
require.NoError(t, err)
require.Equal(t, tc.ExpectedSourceURI, sourceUri)

})
}
}

func isAvailable(name, version string) error {
p := getProgram(name, version)
spec := p.ProcessSpec()
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
linux = "linux"
windows = "windows"

defaultSourceURI = "https://artifacts.elastic.co/downloads/"
DefaultSourceURI = "https://artifacts.elastic.co/downloads/"
)

type ConfigReloader interface {
Expand Down Expand Up @@ -139,8 +139,8 @@ func (r *Reloader) reloadSourceURI(rawConfig *config.Config) error {
r.cfg.SourceURI = newSourceURI
} else {
// source uri unset, reset to default
r.log.Infof("Source URI reset from %q to %q", r.cfg.SourceURI, defaultSourceURI)
r.cfg.SourceURI = defaultSourceURI
r.log.Infof("Source URI reset from %q to %q", r.cfg.SourceURI, DefaultSourceURI)
r.cfg.SourceURI = DefaultSourceURI
}

return nil
Expand All @@ -156,7 +156,7 @@ func DefaultConfig() *Config {
transport.Timeout = 10 * time.Minute

return &Config{
SourceURI: defaultSourceURI,
SourceURI: DefaultSourceURI,
TargetDirectory: paths.Downloads(),
InstallPath: paths.Install(),
HTTPTransportSettings: transport,
Expand Down

0 comments on commit 2b6cfdc

Please sign in to comment.