Skip to content

Commit

Permalink
make download of upgrade artifacts async
Browse files Browse the repository at this point in the history
- handle upgrade in separate go routine
- increase download timeout to 2 hours
- add KeepAliveSettings
- set idle connection timeout to 30 sec

Closes elastic#1706
Duplicate elastic#1666
  • Loading branch information
leehinman committed Mar 8, 2023
1 parent 0841871 commit 9f83a62
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: async download of elastic-agent upgrade artifacts during fleet managed upgrade

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component: agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package handlers
import (
"context"
"fmt"
"sync"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
Expand All @@ -18,8 +19,10 @@ import (
// After running Upgrade agent should download its own version specified by action
// from repository specified by fleet.
type Upgrade struct {
log *logger.Logger
coord *coordinator.Coordinator
log *logger.Logger
coord *coordinator.Coordinator
bkgAction *fleetapi.ActionUpgrade
m sync.Mutex
}

// NewUpgrade creates a new Upgrade handler.
Expand All @@ -31,13 +34,34 @@ func NewUpgrade(log *logger.Logger, coord *coordinator.Coordinator) *Upgrade {
}

// Handle handles UPGRADE action.
func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, _ acker.Acker) error {
func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker) error {
h.log.Debugf("handlerUpgrade: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionUpgrade)
if !ok {
return fmt.Errorf("invalid type, expected ActionUpgrade and received %T", a)
}

// always perform PGP checks
return h.coord.Upgrade(ctx, action.Version, action.SourceURI, action, false)
go func() {
h.m.Lock()
if h.bkgAction != nil {
h.log.Infof("upgrade to version %s already running in background", h.bkgAction.Version)
h.m.Unlock()
return
}
h.bkgAction = action
h.m.Unlock()
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(ctx, action.Version, action.SourceURI, action, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
if err := ack.Ack(ctx, action); err != nil {
h.log.Errorf("ack of failed upgrade failed: %v", err)
}
if err := ack.Commit(ctx); err != nil {
h.log.Errorf("commit of ack for failed upgrade failed: %v", err)
}
}
h.m.Lock()
h.bkgAction = nil
h.m.Unlock()
}()
return nil
}
13 changes: 5 additions & 8 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

var (
// ErrNotUpgradable error is returned when upgrade cannot be performed.
ErrNotUpgradable = errors.New(
"cannot be upgraded; must be installed with install sub-command and " +
"running under control of the systems supervisor")
)
// ErrNotUpgradable error is returned when upgrade cannot be performed.
var ErrNotUpgradable = errors.New(
"cannot be upgraded; must be installed with install sub-command and " +
"running under control of the systems supervisor")

// ReExecManager provides an interface to perform re-execution of the entire agent.
type ReExecManager interface {
Expand Down Expand Up @@ -119,8 +117,7 @@ type ConfigChange interface {
}

// ErrorReporter provides an interface for any manager that is handled by the coordinator to report errors.
type ErrorReporter interface {
}
type ErrorReporter interface{}

// ConfigManager provides an interface to run and watch for configuration changes.
type ConfigManager interface {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/upgrade/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func DefaultConfig() *Config {
transport := httpcommon.DefaultHTTPTransportSettings()

// Elastic Agent binary is rather large and based on the network bandwidth it could take some time
// to download the full file. 10 minutes is a very large value, but we really want it to finish.
// to download the full file. 120 minutes is a very large value, but we really want it to finish.
// The HTTP download will log progress in the case that it is taking a while to download.
transport.Timeout = 10 * time.Minute
transport.Timeout = 120 * time.Minute

return &Config{
SourceURI: DefaultSourceURI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
packagePermissions = 0660
packagePermissions = 0o660

// downloadProgressIntervalPercentage defines how often to report the current download progress when percentage
// of time has passed in the overall interval for the complete download to complete. 5% is a good default, as
Expand All @@ -51,6 +51,7 @@ type Downloader struct {
func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, error) {
client, err := config.HTTPTransportSettings.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second},
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,7 +174,7 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
}

if destinationDir := filepath.Dir(fullPath); destinationDir != "" && destinationDir != "." {
if err := os.MkdirAll(destinationDir, 0755); err != nil {
if err := os.MkdirAll(destinationDir, 0o755); err != nil {
return "", err
}
}
Expand Down
20 changes: 8 additions & 12 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,14 @@ const (
runDirMod = 0770
)

var (
agentArtifact = artifact.Artifact{
Name: "Elastic Agent",
Cmd: agentName,
Artifact: "beats/" + agentName,
}
)
var agentArtifact = artifact.Artifact{
Name: "Elastic Agent",
Cmd: agentName,
Artifact: "beats/" + agentName,
}

var (
// ErrSameVersion error is returned when the upgrade results in the same installed version.
ErrSameVersion = errors.New("upgrade did not occur because its the same version")
)
// ErrSameVersion error is returned when the upgrade results in the same installed version.
var ErrSameVersion = errors.New("upgrade did not occur because its the same version")

// Upgrader performs an upgrade
type Upgrader struct {
Expand Down Expand Up @@ -249,7 +245,7 @@ func copyActionStore(log *logger.Logger, newHash string) error {
return err
}

if err := os.WriteFile(newActionStorePath, currentActionStore, 0600); err != nil {
if err := os.WriteFile(newActionStorePath, currentActionStore, 0o600); err != nil {
return err
}
}
Expand Down

0 comments on commit 9f83a62

Please sign in to comment.