Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agent diagnostics action #1703

Merged
merged 40 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
eddcfa9
Add agent diagnostics action
michel-laterman Oct 25, 2022
b70f7c3
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Nov 16, 2022
e9b59a2
Fix PR and use control client instead of coord in handler
michel-laterman Nov 16, 2022
ba94a2b
Change AckEvent construction from fleet acker to an action method
michel-laterman Nov 17, 2022
ef7038a
Fix linter and tests
michel-laterman Nov 17, 2022
5e2792a
Fix linter
michel-laterman Nov 21, 2022
8fa1bdc
Merge branch 'main' into diagnostics-action
michel-laterman Dec 12, 2022
267cb6c
Fix merge
michel-laterman Dec 12, 2022
ee1f787
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Dec 12, 2022
3a949f3
Fix tests
michel-laterman Dec 12, 2022
b316427
remove duplication when creating an ackevent for an action
michel-laterman Dec 12, 2022
427c7ab
Merge branch 'main' into diagnostics-action
michel-laterman Dec 15, 2022
b185b91
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Jan 3, 2023
cffeca6
Retry upload for non-context errors
michel-laterman Jan 4, 2023
5c404f0
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Jan 5, 2023
9e6cdb8
Fix linter
michel-laterman Jan 5, 2023
d46ce20
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Jan 23, 2023
4318754
review feedback, fix diagnostics acks
michel-laterman Jan 23, 2023
1a1a9d9
Add JSON deserialization, fix yaml
michel-laterman Jan 24, 2023
55797c3
Add debug messages to handler
michel-laterman Jan 25, 2023
3fa3e1b
Fix uploader implementation and handler bug
michel-laterman Jan 25, 2023
abe952e
Review feedback
michel-laterman Jan 25, 2023
aea2b95
Fix linter
michel-laterman Jan 25, 2023
db52a04
Change diag ack to use upload_id add dates to diag directories
michel-laterman Jan 26, 2023
214fb22
Add rate limiter to diagnostics action handler
michel-laterman Jan 27, 2023
78ba518
update config
michel-laterman Jan 27, 2023
2693ce2
Add changelog fragment, fix linter
michel-laterman Jan 27, 2023
b29e75f
changed file_id to upload_id, updated changelog
juliaElastic Jan 27, 2023
abd31a8
updated diagnostics file name
juliaElastic Jan 27, 2023
383a576
Revert hooks changes, move log collection to ZipArchive
michel-laterman Jan 28, 2023
a2c8631
Merge remote-tracking branch 'origin/main' into diagnostics-action
michel-laterman Jan 28, 2023
4546751
Cleanup and yaml redaction fix
michel-laterman Jan 28, 2023
3776773
handler and redact fixes
michel-laterman Jan 28, 2023
3f868d2
fixed storing action_id correctly in files index
juliaElastic Jan 30, 2023
3adfdaa
commit the ack
michel-laterman Jan 30, 2023
8d96c53
Diagnostics handler will use temp file
michel-laterman Jan 31, 2023
cc1cd33
Change to async handler, add panic recover
michel-laterman Jan 31, 2023
f97d6d7
fix linter
michel-laterman Jan 31, 2023
cfa3e96
Update internal/pkg/agent/application/actions/handlers/handler_action…
michel-laterman Jan 31, 2023
09293ff
build error out of recovered item
michel-laterman Jan 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions _meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ inputs:
# port: 6791
# # Metrics buffer endpoint
# buffer.enabled: false
# # Configuration for the diagnostics action handler
# diagnostics:
# # Rate limit for the action handler. Does not affect diagnostics collected through the CLI.
# limit:
# # Rate limit interval.
# interval: 1m
# # Rate limit burst.
# burst: 1
# # Configuration for the file-upload client. Client may retry failed requests with an exponential backoff.
# uploader:
# # Max retries allowed when uploading a chunk.
# max_retries: 10
# # Initial duration of the backoff.
# init_dur: 1s
# # Max duration of the backoff.
# max_dur: 1m

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
Expand Down
16 changes: 16 additions & 0 deletions _meta/config/common.reference.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ inputs:
# port: 6791
# # Metrics buffer endpoint
# buffer.enabled: false
# # Configuration for the diagnostics action handler
# diagnostics:
# # Rate limit for the action handler. Does not affect diagnostics collected through the CLI.
# limit:
# # Rate limit interval.
# interval: 1m
# # Rate limit burst.
# burst: 1
# # Configuration for the file-upload client. Client may retry failed requests with an exponential backoff.
# uploader:
# # Max retries allowed when uploading a chunk.
# max_retries: 10
# # Initial duration of the backoff.
# init_dur: 1s
# # Max duration of the backoff.
# max_dur: 1m

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: add diagnostics action handler

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: |
Add support for the REQUEST_DIAGNOSTICS action.
When this action is recieved the agent will collect a diagnostics bundle and
uploads it to fleet-server using the file upload APIs.
The handler has a configurable rate limit in order to prevent DOS attacks.
The uploader may retry failures with a configurable exponential backoff.

# Affected component; a word indicating the component this changeset affects.
component: diagnostics

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 1703

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 1883
16 changes: 16 additions & 0 deletions elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ inputs:
# port: 6791
# # Metrics buffer endpoint
# buffer.enabled: false
# # Configuration for the diagnostics action handler
# diagnostics:
# # Rate limit for the action handler. Does not affect diagnostics collected through the CLI.
# limit:
# # Rate limit interval.
# interval: 1m
# # Rate limit burst.
# burst: 1
# # Configuration for the file-upload client. Client may retry failed requests with an exponential backoff.
# uploader:
# # Max retries allowed when uploading a chunk.
# max_retries: 10
# # Initial duration of the backoff.
# init_dur: 1s
# # Max duration of the backoff.
# max_dur: 1m

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
Expand Down
16 changes: 16 additions & 0 deletions elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ inputs:
# port: 6791
# # Metrics buffer endpoint
# buffer.enabled: false
# # Configuration for the diagnostics action handler
# diagnostics:
# # Rate limit for the action handler. Does not affect diagnostics collected through the CLI.
# limit:
# # Rate limit interval.
# interval: 1m
# # Rate limit burst.
# burst: 1
# # Configuration for the file-upload client. Client may retry failed requests with an exponential backoff.
# uploader:
# # Max retries allowed when uploading a chunk.
# max_retries: 10
# # Initial duration of the backoff.
# init_dur: 1s
# # Max duration of the backoff.
# max_dur: 1m

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package handlers

import (
"bytes"
"context"
"fmt"
"io"
"os"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/v2/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/v2/cproto"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/core/logger"

"golang.org/x/time/rate"
)

// ErrRateLimit is the rate limit error that is returned if the handler is ran too often.
// This may occur if the user sends multiple diagnostics actions to an agent in a short duration
// or if the agent goes offline and retrieves multiple diagnostics actions.
// In either case the 1st action will succeed and the others will ack with an the error.
var ErrRateLimit = fmt.Errorf("rate limit exceeded")

// Uploader is the interface used to upload a diagnostics bundle to fleet-server.
type Uploader interface {
UploadDiagnostics(context.Context, string, string, int64, io.Reader) (string, error)
}

// Diagnostics is the handler to process Diagnostics actions.
// When a Diagnostics action is received a full diagnostics bundle is taken and uploaded to fleet-server.
type Diagnostics struct {
log *logger.Logger
coord *coordinator.Coordinator
limiter *rate.Limiter
uploader Uploader
}

// NewDiagnostics returns a new Diagnostics handler.
func NewDiagnostics(log *logger.Logger, coord *coordinator.Coordinator, cfg config.Limit, uploader Uploader) *Diagnostics {
return &Diagnostics{
log: log,
coord: coord,
limiter: rate.NewLimiter(rate.Every(cfg.Interval), cfg.Burst),
uploader: uploader,
}
}

// Handle processes the passed Diagnostics action.
func (h *Diagnostics) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker) error {
h.log.Debugf("handlerDiagnostics: action '%+v' received", a)
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
action, ok := a.(*fleetapi.ActionDiagnostics)
if !ok {
return fmt.Errorf("invalid type, expected ActionDiagnostics and received %T", a)
}
ts := time.Now().UTC()
defer func() {
ack.Ack(ctx, action) //nolint:errcheck // no path for a failed ack
ack.Commit(ctx) //nolint:errcheck //no path for failing a commit
}()

if !h.limiter.Allow() {
action.Err = ErrRateLimit
return ErrRateLimit
}

h.log.Debug("Gathering agent diagnostics.")
aDiag, err := h.runHooks(ctx)
if err != nil {
action.Err = err
return fmt.Errorf("unable to gather agent diagnostics: %w", err)
}
h.log.Debug("Gathering unit diagnostics.")
uDiag := h.diagUnits(ctx)

var r io.Reader
// attempt to create the a temporary diagnostics file on disk in order to avoid loading a
// potentially large file in memory.
// if on-disk creation fails an in-memory buffer is used.
f, s, err := h.diagFile(aDiag, uDiag)
Comment on lines +102 to +105
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Amit's suggestion here to use an on-disk temp file and fall a back to an in-memory buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work to turn around on that feedback so quickly. Really appreciate it.

if err != nil {
var b bytes.Buffer
h.log.Warnw("Diagnostics action unable to use tempoary file, using buffer instead.", "err", err)
var wBuf bytes.Buffer
defer func() {
if str := wBuf.String(); str != "" {
h.log.Warn(str)
}
}()
err := diagnostics.ZipArchive(&wBuf, &b, aDiag, uDiag)
if err != nil {
action.Err = err
return fmt.Errorf("error creating diagnostics bundle: %w", err)
}
r = &b
s = int64(b.Len())
} else {
defer func() {
f.Close()
os.Remove(f.Name())
}()
r = f
}
h.log.Debug("Sending diagnostics archive.")
uploadID, err := h.uploader.UploadDiagnostics(ctx, action.ActionID, ts.Format("2006-01-02T15-04-05Z07-00"), s, r) // RFC3339 format that uses - instead of : so it works on Windows
action.Err = err
action.UploadID = uploadID
if err != nil {
return fmt.Errorf("unable to upload diagnostics: %w", err)
}
h.log.Debugf("Diagnostics action '%+v' complete.", a)
return nil
}

func (h *Diagnostics) runHooks(ctx context.Context) ([]client.DiagnosticFileResult, error) {
hooks := append(h.coord.DiagnosticHooks(), diagnostics.GlobalHooks()...)
diags := make([]client.DiagnosticFileResult, 0, len(hooks))
for _, hook := range hooks {
if ctx.Err() != nil {
return diags, ctx.Err()
}
diags = append(diags, client.DiagnosticFileResult{
Name: hook.Name,
Filename: hook.Filename,
Description: hook.Description,
ContentType: hook.ContentType,
Content: hook.Hook(ctx),
Generated: time.Now().UTC(),
})
}
return diags, nil
}

func (h *Diagnostics) diagUnits(ctx context.Context) []client.DiagnosticUnitResult {
uDiag := make([]client.DiagnosticUnitResult, 0)
rr := h.coord.PerformDiagnostics(ctx)
for _, r := range rr {
diag := client.DiagnosticUnitResult{
ComponentID: r.Component.ID,
UnitID: r.Unit.ID,
UnitType: cproto.UnitType(r.Unit.Type),
}
if r.Err != nil {
diag.Err = r.Err
} else {
results := make([]client.DiagnosticFileResult, 0, len(r.Results))
for _, res := range r.Results {
results = append(results, client.DiagnosticFileResult{
Name: res.Name,
Filename: res.Filename,
Description: res.Description,
ContentType: res.ContentType,
Content: res.Content,
Generated: res.Generated.AsTime(),
})
}
diag.Results = results
}
uDiag = append(uDiag, diag)
}
return uDiag
}

// diagFile will write the diagnostics to a temporary file and return the file ready to be read
func (h *Diagnostics) diagFile(aDiag []client.DiagnosticFileResult, uDiag []client.DiagnosticUnitResult) (*os.File, int64, error) {
f, err := os.CreateTemp("", "elastic-agent-diagnostics")
if err != nil {
return nil, 0, err
}

name := f.Name()
var wBuf bytes.Buffer
defer func() {
if str := wBuf.String(); str != "" {
h.log.Warn(str)
}
}()
if err := diagnostics.ZipArchive(&wBuf, f, aDiag, uDiag); err != nil {
os.Remove(name)
return nil, 0, err
}
f.Sync()
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved

_, err = f.Seek(0, 0)
if err != nil {
os.Remove(name)
return nil, 0, err
}

fi, err := f.Stat()
if err != nil {
os.Remove(name)
return nil, 0, err
}
return f, fi.Size(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
return nil
}

// AckUpgrade performs acknowledgement for upgrade.
// AckUpgrade is the method used on startup to ack a previously successful upgrade action.
func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
return c.upgradeMgr.Ack(ctx, acker)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (m *mockAction) String() string {
args := m.Called()
return args.String(0)
}
func (m *mockAction) AckEvent() fleetapi.AckEvent {
args := m.Called()
return args.Get(0).(fleetapi.AckEvent)
}
func (m *mockScheduledAction) StartTime() (time.Time, error) {
args := m.Called()
return args.Get(0).(time.Time), args.Error(1)
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/uploader"
"github.com/elastic/elastic-agent/internal/pkg/queue"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/internal/pkg/runner"
Expand Down Expand Up @@ -348,6 +349,16 @@ func (m *managedConfigManager) initDispatcher(canceller context.CancelFunc) *han
),
)

m.dispatcher.MustRegister(
&fleetapi.ActionDiagnostics{},
handlers.NewDiagnostics(
m.log,
m.coord,
m.cfg.Settings.MonitoringConfig.Diagnostics.Limit,
uploader.New(m.agentInfo.AgentID(), m.client, m.cfg.Settings.MonitoringConfig.Diagnostics.Uploader),
),
)

m.dispatcher.MustRegister(
&fleetapi.ActionApp{},
handlers.NewAppAction(m.log, m.coord),
Expand Down
Loading