Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Agent upgrade 8.2->8.3 #578

Merged
merged 6 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,58 @@ type UpdateMarker struct {
Action *fleetapi.ActionUpgrade `json:"action" yaml:"action"`
}

// MarkerActionUpgrade adapter struct compatible with pre 8.3 version of the marker file format
type MarkerActionUpgrade struct {
ActionID string `yaml:"id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a part of the update failed because I changed the serialization of the upgrade action with

type ActionUpgrade struct {
ActionID string `yaml:"action_id"`

I tried to make it more inline in fleet-server/model/schema.json (though fleet-server will remap action_id to id when giving an agent its actions on checkin).
Should we revisit that?

ActionType string `yaml:"type"`
Version string `yaml:"version"`
SourceURI string `yaml:"source_uri,omitempty"`
}

func convertToMarkerAction(a *fleetapi.ActionUpgrade) *MarkerActionUpgrade {
if a == nil {
return nil
}
return &MarkerActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
}
}

func convertToActionUpgrade(a *MarkerActionUpgrade) *fleetapi.ActionUpgrade {
if a == nil {
return nil
}
return &fleetapi.ActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
}
}

type updateMarkerSerializer struct {
Hash string `yaml:"hash"`
UpdatedOn time.Time `yaml:"updated_on"`
PrevVersion string `yaml:"prev_version"`
PrevHash string `yaml:"prev_hash"`
Acked bool `yaml:"acked"`
Action *MarkerActionUpgrade `yaml:"action"`
}

func newMarkerSerializer(m *UpdateMarker) *updateMarkerSerializer {
return &updateMarkerSerializer{
Hash: m.Hash,
UpdatedOn: m.UpdatedOn,
PrevVersion: m.PrevVersion,
PrevHash: m.PrevHash,
Acked: m.Acked,
Action: convertToMarkerAction(m.Action),
}
}

// markUpgrade marks update happened so we can handle grace period
func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) error {
prevVersion := release.Version()
Expand All @@ -46,15 +98,15 @@ func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) er
prevHash = prevHash[:hashLen]
}

marker := UpdateMarker{
marker := &UpdateMarker{
Hash: hash,
UpdatedOn: time.Now(),
PrevVersion: prevVersion,
PrevHash: prevHash,
Action: action.FleetAction(),
}

markerBytes, err := yaml.Marshal(marker)
markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}
Expand Down Expand Up @@ -103,16 +155,31 @@ func LoadMarker() (*UpdateMarker, error) {
return nil, err
}

marker := &UpdateMarker{}
marker := &updateMarkerSerializer{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this work if marker already contains new form of action ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the action_id breakage is new and specific to 8.3 branch, it was not releases as a part of the 8.2.x, I tested the upgrade between latest 8.2.4 and 8.3

if err := yaml.Unmarshal(markerBytes, &marker); err != nil {
return nil, err
}

return marker, nil
return &UpdateMarker{
Hash: marker.Hash,
UpdatedOn: marker.UpdatedOn,
PrevVersion: marker.PrevVersion,
PrevHash: marker.PrevHash,
Acked: marker.Acked,
Action: convertToActionUpgrade(marker.Action),
}, nil
}

func saveMarker(marker *UpdateMarker) error {
markerBytes, err := yaml.Marshal(marker)
makerSerializer := &updateMarkerSerializer{
Hash: marker.Hash,
UpdatedOn: marker.UpdatedOn,
PrevVersion: marker.PrevVersion,
PrevHash: marker.PrevHash,
Acked: marker.Acked,
Action: convertToMarkerAction(marker.Action),
}
markerBytes, err := yaml.Marshal(makerSerializer)
if err != nil {
return err
}
Expand Down
76 changes: 2 additions & 74 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package upgrade

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand All @@ -20,10 +19,8 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/secret"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
Expand Down Expand Up @@ -173,10 +170,6 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree
return nil, errors.New(err, "failed to copy action store")
}

if err := encryptConfigIfNeeded(u.log, newHash); err != nil {
return nil, errors.New(err, "failed to encrypt the configuration")
}

if err := ChangeSymlink(ctx, newHash); err != nil {
rollbackInstall(ctx, newHash)
return nil, err
Expand Down Expand Up @@ -220,6 +213,8 @@ func (u *Upgrader) Ack(ctx context.Context) error {
return err
}

marker.Acked = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michel-laterman this seems to be needed here, since the action was just acked so we can persist the market with correct "acked" value. Please double check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct, however I didn't alter any of the upgrade action steps when I added the queue


return saveMarker(marker)
}

Expand Down Expand Up @@ -335,73 +330,6 @@ func copyVault(newHash string) error {
return nil
}

// Create the key if it doesn't exist and encrypt the fleet.yml and state.yml
func encryptConfigIfNeeded(log *logger.Logger, newHash string) (err error) {
vaultPath := getVaultPath(newHash)

err = secret.CreateAgentSecret(secret.WithVaultPath(vaultPath))
if err != nil {
return err
}

newHome := filepath.Join(filepath.Dir(paths.Home()), fmt.Sprintf("%s-%s", agentName, newHash))
ymlStateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreYmlFile()))
stateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreFile()))

files := []struct {
Src string
Dst string
}{
{
Src: ymlStateStorePath,
Dst: stateStorePath,
},
{
Src: paths.AgentConfigYmlFile(),
Dst: paths.AgentConfigFile(),
},
}
for _, f := range files {
var b []byte
b, err = ioutil.ReadFile(f.Src)
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}

// Encrypt yml file
store := storage.NewEncryptedDiskStore(f.Dst, storage.WithVaultPath(vaultPath))
err = store.Save(bytes.NewReader(b))
if err != nil {
return err
}

// Remove yml file if no errors
defer func(fp string) {
if err != nil {
return
}
if rerr := os.Remove(fp); rerr != nil {
log.Warnf("failed to remove file: %s, err: %v", fp, rerr)
}
}(f.Src)
}

// Do not remove AgentConfigYmlFile lock file if any error happened.
if err != nil {
return err
}

lockFp := paths.AgentConfigYmlFile() + ".lock"
if rerr := os.Remove(lockFp); rerr != nil {
log.Warnf("failed to remove file: %s, err: %v", lockFp, rerr)
}

return err
}

// shutdownCallback returns a callback function to be executing during shutdown once all processes are closed.
// this goes through runtime directory of agent and copies all the state files created by processes to new versioned
// home directory with updated process name to match new version.
Expand Down
111 changes: 111 additions & 0 deletions internal/pkg/agent/cleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cleaner

import (
"context"
"os"
"sync"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/fileutil"
)

// Wait interval.
// If the watchFile was not modified after this interval, then remove all the files in the removeFiles array
const defaultCleanWait = 15 * time.Minute

type Cleaner struct {
log *logp.Logger
watchFile string
removeFiles []string
cleanWait time.Duration

mx sync.Mutex
}

type OptionFunc func(c *Cleaner)

func New(log *logp.Logger, watchFile string, removeFiles []string, opts ...OptionFunc) *Cleaner {
c := &Cleaner{
log: log,
watchFile: watchFile,
removeFiles: removeFiles,
cleanWait: defaultCleanWait,
}

for _, opt := range opts {
opt(c)
}
return c
}

func WithCleanWait(cleanWait time.Duration) OptionFunc {
return func(c *Cleaner) {
c.cleanWait = cleanWait
}
}

func (c *Cleaner) Run(ctx context.Context) error {
wait, done, err := c.process()
if err != nil {
return err
}

if done {
return nil
}

t := time.NewTimer(wait)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
c.log.Debug("cleaner: timer triggered")
wait, done, err = c.process()
if err != nil {
return err
}

if done {
return nil
}
t.Reset(wait)
}
}
}

func (c *Cleaner) process() (wait time.Duration, done bool, err error) {
modTime, err := fileutil.GetModTime(c.watchFile)
if err != nil {
return
}

c.log.Debugf("cleaner: check file %s mod time: %v", c.watchFile, modTime)
curDur := time.Since(modTime)
if curDur > c.cleanWait {
c.log.Debugf("cleaner: file %s modification expired", c.watchFile)
c.deleteFiles()
return wait, true, nil
}
wait = c.cleanWait - curDur
return wait, false, nil
}

func (c *Cleaner) deleteFiles() {
c.log.Debugf("cleaner: delete files: %v", c.removeFiles)
c.mx.Lock()
defer c.mx.Unlock()
for _, fp := range c.removeFiles {
c.log.Debugf("cleaner: delete file: %v", fp)
err := os.Remove(fp)
if err != nil {
c.log.Warnf("cleaner: delete file %v failed: %v", fp, err)
}
}
}
Loading