Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring HTTP downloader progress reporter to accept multiple observers #3542

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"strings"
"time"

"github.com/docker/go-units"

"github.com/elastic/elastic-agent-libs/atomic"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
Expand All @@ -46,13 +44,13 @@ const (

// Downloader is a downloader able to fetch artifacts from elastic.co web page.
type Downloader struct {
log progressLogger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I always prefer the logger as interfaces for mocking and loose coupling, instead of relying on a more concrete type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I agree but I'm also a fan of YAGNI. We can easily change this back to an interface if/when we need it, i.e. have more than one implementation.

log *logger.Logger
config *artifact.Config
client http.Client
}

// NewDownloader creates and configures Elastic Downloader
func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, error) {
func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) {
client, err := config.HTTPTransportSettings.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second},
Expand All @@ -66,7 +64,7 @@ func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, er
}

// NewDownloaderWithClient creates Elastic Downloader with specific client used
func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client http.Client) *Downloader {
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader {
return &Downloader{
log: log,
config: config,
Expand Down Expand Up @@ -208,152 +206,16 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
}
}

reportCtx, reportCancel := context.WithCancel(ctx)
dp := newDownloadProgressReporter(e.log, sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize)
dp.Report(reportCtx)
loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
dp.Report(ctx)
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
if err != nil {
reportCancel()
dp.ReportFailed(err)
// return path, file already exists and needs to be cleaned up
return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}
reportCancel()
dp.ReportComplete()

return fullPath, nil
}

type downloadProgressReporter struct {
log progressLogger
sourceURI string
interval time.Duration
warnTimeout time.Duration
length float64

downloaded atomic.Int
started time.Time
}

func newDownloadProgressReporter(log progressLogger, sourceURI string, timeout time.Duration, length int) *downloadProgressReporter {
interval := time.Duration(float64(timeout) * downloadProgressIntervalPercentage)
if interval == 0 {
interval = downloadProgressMinInterval
}

return &downloadProgressReporter{
log: log,
sourceURI: sourceURI,
interval: interval,
warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage),
length: float64(length),
}
}

func (dp *downloadProgressReporter) Write(b []byte) (int, error) {
n := len(b)
dp.downloaded.Add(n)
return n, nil
}

func (dp *downloadProgressReporter) Report(ctx context.Context) {
started := time.Now()
dp.started = started
sourceURI := dp.sourceURI
log := dp.log
length := dp.length
warnTimeout := dp.warnTimeout
interval := dp.interval

go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
now := time.Now()
timePast := now.Sub(started)
downloaded := float64(dp.downloaded.Load())
bytesPerSecond := downloaded / float64(timePast/time.Second)

var msg string
var args []interface{}
if length > 0 {
// length of the download is known, so more detail can be provided
percentComplete := downloaded / length * 100.0
msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps"
args = []interface{}{
sourceURI, units.HumanSize(downloaded), units.HumanSize(length), percentComplete, units.HumanSize(bytesPerSecond),
}
} else {
// length unknown so provide the amount downloaded and the speed
msg = "download progress from %s has fetched %s @ %sps"
args = []interface{}{
sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond),
}
}

log.Infof(msg, args...)
if timePast >= warnTimeout {
// duplicate to warn when over the warnTimeout; this still has it logging to info that way if
// they are filtering the logs to info they still see the messages when over the warnTimeout, but
// when filtering only by warn they see these messages only
log.Warnf(msg, args...)
}
}
}
}()
}

func (dp *downloadProgressReporter) ReportComplete() {
now := time.Now()
timePast := now.Sub(dp.started)
downloaded := float64(dp.downloaded.Load())
bytesPerSecond := downloaded / float64(timePast/time.Second)
msg := "download from %s completed in %s @ %sps"
args := []interface{}{
dp.sourceURI, units.HumanDuration(timePast), units.HumanSize(bytesPerSecond),
}
dp.log.Infof(msg, args...)
if timePast >= dp.warnTimeout {
// see reason in `Report`
dp.log.Warnf(msg, args...)
}
}

func (dp *downloadProgressReporter) ReportFailed(err error) {
now := time.Now()
timePast := now.Sub(dp.started)
downloaded := float64(dp.downloaded.Load())
bytesPerSecond := downloaded / float64(timePast/time.Second)
var msg string
var args []interface{}
if dp.length > 0 {
// length of the download is known, so more detail can be provided
percentComplete := downloaded / dp.length * 100.0
msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s"
args = []interface{}{
dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(dp.length), percentComplete, units.HumanSize(bytesPerSecond), err,
}
} else {
// length unknown so provide the amount downloaded and the speed
msg = "download from %s failed at %s @ %sps: %s"
args = []interface{}{
dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), err,
}
}
dp.log.Infof(msg, args...)
if timePast >= dp.warnTimeout {
// see reason in `Report`
dp.log.Warnf(msg, args...)
}
}

// progressLogger is a logger that only needs to implement Infof and Warnf, as those are the only functions
// that the downloadProgressReporter uses.
type progressLogger interface {
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
}
Loading
Loading