Skip to content

Commit

Permalink
[Upgrade Details] Ensure details report UPG_WATCHING for the entire…
Browse files Browse the repository at this point in the history
… time that the upgrade is being watched (elastic#3827)

* Don't set upgrade details when creating upgrade marker

* Set UPG_WATCHING state right before starting to watch upgrade

* Log upgrade details whenever they're set on the coordinator

* Fix logging location

* Revert "Don't set upgrade details when creating upgrade marker"

This reverts commit 6821832.

* Fix logic with assuming UPG_ROLLBACK state

* Add FIXME

* Correctly observe upgrade details changes

* Update unit test

* Include upgrade details in status output

* Check upgrade details state before and after upgrade watcher starts

* Check that upgrade details have been cleared out upon successful upgrade

* Update unit test

* Fixing up upgrade integration tests

* Add unit test + fix details object being used

* Define AgentStatusOutput.IsZero() and use it

* Make sure Marker Watcher accounts for `UPG_COMPLETED` state

* Fix location of assertion

* Fix error message

* Join errors for wrapping

* Debugging why TestStandaloneDowngradeToSpecificSnapshotBuild is failing

* Cast string to details.State

* Remove version override debugging

* Wrap bugfix assertions in version checks

* Introduce upgradetest.WithDisableUpgradeWatcherUpgradeDetailsCheck option

* Call option function

* Debugging

* Fixing version check logic

* Remove debugging statements
  • Loading branch information
ycombinator authored Dec 19, 2023
1 parent cc81e4b commit ad7e1b5
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 49 deletions.
1 change: 0 additions & 1 deletion internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
}
det := details.NewDetails(version, details.StateRequested, actionID)
det.RegisterObserver(c.SetUpgradeDetails)
det.RegisterObserver(c.logUpgradeDetails)

cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState)
func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) {
c.state.UpgradeDetails = upgradeDetails
c.stateNeedsRefresh = true

c.logUpgradeDetails(upgradeDetails)
}

// Forward the current state to the broadcaster and clear the stateNeedsRefresh
Expand Down
17 changes: 16 additions & 1 deletion internal/pkg/agent/application/upgrade/marker_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type MarkerFileWatcher struct {
updateCh chan UpdateMarker

upgradeStarted atomic.Bool
lastMarker *UpdateMarker
}

func newMarkerFileWatcher(upgradeMarkerFilePath string, logger *logger.Logger) MarkerWatcher {
Expand Down Expand Up @@ -100,6 +101,16 @@ func (mfw *MarkerFileWatcher) Run(ctx context.Context) error {
// Upgrade marker file was created or updated; read its contents
// and send them over the update channel.
mfw.processMarker(version.GetAgentPackageVersion(), version.Commit())
case e.Op&(fsnotify.Remove) != 0:
// Upgrade marker file was removed.
// - Upgrade could've been rolled back
// - Upgrade could've been successful
// If last known Upgrade Details state is not `UPG_ROLLBACK`, assume
// upgrade was successful
if mfw.lastMarker != nil && mfw.lastMarker.Details != nil && mfw.lastMarker.Details.State != details.StateRollback {
mfw.lastMarker.Details = nil
mfw.updateCh <- *mfw.lastMarker
}
}
case <-doInitialRead:
mfw.processMarker(version.GetAgentPackageVersion(), version.Commit())
Expand Down Expand Up @@ -129,12 +140,16 @@ func (mfw *MarkerFileWatcher) processMarker(currentVersion string, commit string
// isn't for some reason, we fallback to explicitly setting that state as
// part of the upgrade details in the marker.
if marker.PrevVersion == currentVersion && marker.PrevHash == commit && !mfw.upgradeStarted.Load() {
// If there are no upgrade details in the marker or the state in the
// details is not set for some reason, we assume the worst and
// explicitly set the state to UPG_ROLLBACK
if marker.Details == nil {
marker.Details = details.NewDetails("unknown", details.StateRollback, marker.GetActionID())
} else {
} else if marker.Details.State == "" {
marker.Details.SetState(details.StateRollback)
}
}

mfw.lastMarker = marker
mfw.updateCh <- *marker
}
8 changes: 4 additions & 4 deletions internal/pkg/agent/application/upgrade/marker_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,18 @@ details:
State: details.StateRollback,
},
},
"same_version_with_details_wrong_state": {
"same_version_with_details_some_state": {
markerFileContents: `
prev_version: 8.9.2
details:
target_version: 8.9.2
state: UPG_WATCHING
state: UPG_REPLACING
`,
upgradeStarted: false,
expectedErrLogMsg: false,
expectedDetails: &details.Details{
TargetVersion: "8.9.2",
State: details.StateRollback,
State: details.StateReplacing,
},
},
"different_version": {
Expand Down Expand Up @@ -185,7 +185,7 @@ details:
expectedErrLogMsg: false,
expectedDetails: &details.Details{
TargetVersion: "8.9.2",
State: details.StateRollback,
State: details.StateWatching,
},
},
"same_version_same_hash_no_details": {
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
return nil, err
}

det.SetState(details.StateWatching)
if err := u.markUpgrade(ctx, u.log, newHash, action, det); err != nil {
u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
rollbackInstall(ctx, u.log, newHash)
Expand Down
56 changes: 23 additions & 33 deletions internal/pkg/agent/cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/core/logger"
agtversion "github.com/elastic/elastic-agent/pkg/version"
"github.com/elastic/elastic-agent/version"
)

Expand Down Expand Up @@ -105,51 +104,26 @@ func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error {
return nil
}

// About to start watching the upgrade. Initialize upgrade details and save them in the
// upgrade marker.
upgradeDetails := initUpgradeDetails(marker, upgrade.SaveMarker, log)

errorCheckInterval := cfg.Settings.Upgrade.Watcher.ErrorCheck.Interval
ctx := context.Background()
if err := watch(ctx, tilGrace, errorCheckInterval, log); err != nil {
log.Error("Error detected, proceeding to rollback: %v", err)

// If we are upgrading from version >= 8.12.0, marker.Details should be non-nil
// because the Agent we upgraded FROM would've written upgrade details in the upgrade
// marker. However, if we're upgrading from version < 8.12.0, the marker won't
// contain upgrade details, so we populate them now.
if marker.Details == nil {
fromVersion, err := agtversion.ParseVersion(marker.PrevVersion)
if err != nil {
log.Warnf("upgrade details are nil, but unable to parse version being upgraded from [%s]: %s", marker.PrevVersion, err.Error())
} else if fromVersion.Less(*agtversion.NewParsedSemVer(8, 12, 0, "", "")) {
log.Warnf("upgrade details are unexpectedly nil, upgrading from version [%s]", marker.PrevVersion)
}

marker.Details = details.NewDetails(version.GetAgentPackageVersion(), details.StateRollback, marker.GetActionID())
}

marker.Details.SetState(details.StateRollback)
err = upgrade.SaveMarker(marker, true)
if err != nil {
log.Errorf("unable to save upgrade marker before attempting to rollback: %s", err.Error())
}

upgradeDetails.SetState(details.StateRollback)
err = upgrade.Rollback(ctx, log, marker.PrevHash, marker.Hash)
if err != nil {
log.Error("rollback failed", err)

marker.Details.Fail(err)
err = upgrade.SaveMarker(marker, true)
if err != nil {
log.Errorf("unable to save upgrade marker after rollback failed: %s", err.Error())
}
upgradeDetails.Fail(err)
}
return err
}

// watch succeeded - upgrade was successful!
marker.Details.SetState(details.StateCompleted)
err = upgrade.SaveMarker(marker, false)
if err != nil {
log.Errorf("unable to save upgrade marker after successful watch: %s", err.Error())
}
upgradeDetails.SetState(details.StateCompleted)

// cleanup older versions,
// in windows it might leave self untouched, this will get cleaned up
Expand Down Expand Up @@ -260,3 +234,19 @@ func getConfig(streams *cli.IOStreams) *configuration.Configuration {

return cfg
}

func initUpgradeDetails(marker *upgrade.UpdateMarker, saveMarker func(*upgrade.UpdateMarker, bool) error, log *logp.Logger) *details.Details {
upgradeDetails := details.NewDetails(version.GetAgentPackageVersion(), details.StateWatching, marker.GetActionID())
upgradeDetails.RegisterObserver(func(details *details.Details) {
marker.Details = details
if err := saveMarker(marker, true); err != nil {
if details != nil {
log.Errorf("unable to save upgrade marker after setting upgrade details (state = %s): %s", details.State, err.Error())
} else {
log.Errorf("unable to save upgrade marker after clearing upgrade details: %s", err.Error())
}
}
})

return upgradeDetails
}
78 changes: 78 additions & 0 deletions internal/pkg/agent/cmd/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"testing"

"go.uber.org/zap/zapcore"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"

"github.com/elastic/elastic-agent/internal/pkg/fleetapi"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/pkg/core/logger"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
)

func TestInitUpgradeDetails(t *testing.T) {
testMarker := &upgrade.UpdateMarker{
Action: &fleetapi.ActionUpgrade{
ActionID: "foobar",
},
}

saveCount := 0
mockSaveMarker := func(marker *upgrade.UpdateMarker, _ bool) error {
saveCount++
if saveCount <= 3 {
testMarker = marker
return nil
}
return errors.New("some error")
}

log, obs := logger.NewTesting("initUpgradeDetails")

upgradeDetails := initUpgradeDetails(testMarker, mockSaveMarker, log)

// Verify initial state
require.NotNil(t, testMarker.Details)
require.Equal(t, details.StateWatching, testMarker.Details.State)
require.Equal(t, 0, obs.Len())

// Verify state after changing details state
upgradeDetails.SetState(details.StateRollback)
require.NotNil(t, testMarker.Details)
require.Equal(t, details.StateRollback, testMarker.Details.State)
require.Equal(t, 0, obs.Len())

// Verify state after clearing details state
upgradeDetails.SetState(details.StateCompleted)
require.Nil(t, testMarker.Details)
require.Equal(t, 0, obs.Len())

// Verify state after changing details state and there's an
// error saving the marker
upgradeDetails.SetState(details.StateRollback)
require.NotNil(t, testMarker.Details)
require.Equal(t, 1, obs.Len())
logs := obs.TakeAll()
require.Equal(t, zapcore.ErrorLevel, logs[0].Level)
require.Equal(t, `unable to save upgrade marker after setting upgrade details (state = UPG_ROLLBACK): some error`, logs[0].Message)

// Verify state after clearing details state and there's an
// error saving the marker
upgradeDetails.SetState(details.StateCompleted)
require.Nil(t, testMarker.Details)
require.Equal(t, 1, obs.Len())
logs = obs.TakeAll()
require.Equal(t, zapcore.ErrorLevel, logs[0].Level)
require.Equal(t, `unable to save upgrade marker after clearing upgrade details: some error`, logs[0].Message)
}
18 changes: 12 additions & 6 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"

"github.com/otiai10/copy"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -572,9 +572,10 @@ func (e *ExecErr) Unwrap() error {
// ExecStatus executes the status subcommand on the prepared Elastic Agent binary.
// It returns the parsed output and the error from the execution. Keep in mind
// the agent exits with status 1 if it's unhealthy, but it still outputs the
// status successfully. Therefore, a not empty AgentStatusOutput is valid
// status successfully. Therefore, a non-empty AgentStatusOutput is valid
// regardless of the error. An empty AgentStatusOutput and non nil error
// means the output could not be parsed.
// means the output could not be parsed. Use AgentStatusOutput.IsZero() to
// determine if the returned AgentStatusOutput is empty or not.
// It should work with any 8.6+ agent
func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (AgentStatusOutput, error) {
out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opts...)
Expand Down Expand Up @@ -1007,8 +1008,13 @@ type AgentStatusOutput struct {
} `json:"meta"`
} `json:"version_info,omitempty"`
} `json:"components"`
FleetState int `json:"FleetState"`
FleetMessage string `json:"FleetMessage"`
FleetState int `json:"FleetState"`
FleetMessage string `json:"FleetMessage"`
UpgradeDetails *details.Details `json:"upgrade_details"`
}

func (aso *AgentStatusOutput) IsZero() bool {
return aso.Info.ID == ""
}

type AgentInspectOutput struct {
Expand Down
6 changes: 5 additions & 1 deletion testing/integration/upgrade_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func TestStandaloneDowngradeToSpecificSnapshotBuild(t *testing.T) {

t.Logf("Testing Elastic Agent upgrade from %s to %s...", define.Version(), endParsedVersion.String())

err = upgradetest.PerformUpgrade(ctx, startFixture, endFixture, t)
// We pass the upgradetest.WithDisableUpgradeWatcherUpgradeDetailsCheck option here because the endFixture
// is fetched from the artifacts API and it may not contain changes in the Upgrade Watcher whose effects are
// being asserted upon in upgradetest.PerformUpgrade.
// TODO: Stop passing this option and remove these comments once 8.13.0 has been released.
err = upgradetest.PerformUpgrade(ctx, startFixture, endFixture, t, upgradetest.WithDisableUpgradeWatcherUpgradeDetailsCheck())
assert.NoError(t, err)
}
4 changes: 2 additions & 2 deletions testing/integration/upgrade_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ inputs:
require.NoError(t, err)

require.NotNil(t, state.UpgradeDetails)
require.Equal(t, details.StateRollback, state.UpgradeDetails.State)
require.Equal(t, details.StateRollback, details.State(state.UpgradeDetails.State))
}

// rollback should stop the watcher
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestStandaloneUpgradeRollbackOnRestarts(t *testing.T) {
require.NoError(t, err)

require.NotNil(t, state.UpgradeDetails)
require.Equal(t, details.StateRollback, state.UpgradeDetails.State)
require.Equal(t, details.StateRollback, details.State(state.UpgradeDetails.State))
}

// rollback should stop the watcher
Expand Down
Loading

0 comments on commit ad7e1b5

Please sign in to comment.