Skip to content

Commit

Permalink
Fix PR and use control client instead of coord in handler
Browse files Browse the repository at this point in the history
Use the control.Client interface instead of a reference to the
coordinator so that less boilerplate code is used. Fix callback setups.
Cleanup uploader structs as the API has simplified a little.
  • Loading branch information
michel-laterman committed Nov 17, 2022
1 parent b70f7c3 commit e3ecfab
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"fmt"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/client"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
Expand All @@ -26,15 +25,15 @@ type Uploader interface {
// When a Diagnostics action is received a full diagnostics bundle is taken and uploaded to fleet-server.
type Diagnostics struct {
log *logger.Logger
coord *coordinator.Coordinator // TODO use of coordinator or control server/client?
client client.Client
uploader Uploader
}

// NewDiagnostics returns a new Diagnostics handler.
func NewDiagnostics(log *logger.Logger, coord *coordinator.Coordinator, uploader Uploader) *Diagnostics {
func NewDiagnostics(log *logger.Logger, uploader Uploader) *Diagnostics {
return &Diagnostics{
log: log,
coord: coord,
client: client.New(),
uploader: uploader,
}
}
Expand All @@ -48,49 +47,17 @@ func (h *Diagnostics) Handle(ctx context.Context, a fleetapi.Action, ack acker.A
}

// Gather agent diagnostics
diagHooks := append(diagnostics.GlobalHooks(), h.coord.DiagnosticHooks()()...)
aDiag := make([]client.DiagnosticFileResult, 0, len(diagHooks))
for _, hook := range diagHooks {
if ctx.Err() != nil {
return ctx.Err()
}

p, ts := hook.Hook(ctx)
aDiag = append(aDiag, client.DiagnosticFileResult{
Name: hook.Name,
Filename: hook.Filename,
Description: hook.Description,
ContentType: hook.ContentType,
Content: p,
Generated: ts,
})
aDiag, err := h.client.DiagnosticAgent(ctx)
if err != nil {
return fmt.Errorf("unable to gather agent diagnostics: %w", err)
}

runtimeDiag := h.coord.PerformDiagnostics(ctx)
uDiag := make([]client.DiagnosticUnitResult, 0, len(runtimeDiag))
for _, diag := range runtimeDiag {
files := make([]client.DiagnosticFileResult, 0, diag.Results)
for _, f := range diag.Results {
files = append(files, client.DiagnosticFileResult{
Name: f.Name,
Filename: f.Filename,
Description: f.Description,
ContentType: f.ContentType,
Content: f.Content,
Generated: f.Generated.AsTime(),
})
}
uDiag = append(uDiag, client.DiagnosticUnitResult{
ComponentID: diag.Component.ID,
UnitID: diag.Unit.ID,
UnitType: diag.Unit.Type,
Err: diag.Err,
Results: files,
})
uDiag, err := h.client.DiagnosticUnits(ctx)
if err != nil {
return fmt.Errorf("unable to gather unit diagnostics: %w", err)
}

var b bytes.Buffer
err := diagnostics.ZipArchive(b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments
err = diagnostics.ZipArchive(&b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments
if err != nil {
return fmt.Errorf("error creating diagnostics bundle: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"go.elastic.co/apm"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
Expand Down Expand Up @@ -473,6 +474,10 @@ func (c *Coordinator) DiagnosticHooks() func() diagnostics.Hooks {
},
},
}
hooks = append(hooks, c.addLogHooks()...)
hooks = append(hooks, c.addServiceLogHooks()...)

return hooks
}
}

Expand Down Expand Up @@ -542,7 +547,7 @@ func (c *Coordinator) addServiceLogHooks() []diagnostics.Hook {
Name: "services log dir",
Filename: "services/",
ContentType: diagnostics.ContentTypeDirectory,
Hook: func(_ context.Context) []byte { return nil },
Hook: func(_ context.Context) ([]byte, time.Time) { return nil, time.Time{} },
}}
for _, spec := range c.specs.ServiceSpecs() {
if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" {
Expand Down
205 changes: 6 additions & 199 deletions internal/pkg/agent/cmd/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@
package cmd

import (
"archive/zip"
"context"
stderrors "errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/client"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
)

func newDiagnosticsCommand(_ []string, streams *cli.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -74,201 +66,16 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
return fmt.Errorf("failed to fetch component/unit diagnostics: %w", err)
}

err = createZip(fileName, agentDiag, unitDiags)
if err != nil {
return fmt.Errorf("unable to create archive %q: %w", fileName, err)
}
fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName)
fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************")
return nil
}

// createZip creates a zip archive with the passed fileName.
//
// The passed DiagnosticsInfo and AgentConfig data is written in the specified output format.
// Any local log files are collected and copied into the archive.
func createZip(fileName string, agentDiag []client.DiagnosticFileResult, unitDiags []client.DiagnosticUnitResult) error {
f, err := os.Create(fileName)
if err != nil {
return err
}
zw := zip.NewWriter(f)

// write all Elastic Agent diagnostics at the top level
for _, ad := range agentDiag {
zf, err := zw.Create(ad.Filename)
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = zf.Write(ad.Content)
if err != nil {
return closeHandlers(err, zw, f)
}
}

// structure each unit into its own component directory
compDirs := make(map[string][]client.DiagnosticUnitResult)
for _, ud := range unitDiags {
compDir := strings.ReplaceAll(ud.ComponentID, "/", "-")
compDirs[compDir] = append(compDirs[compDir], ud)
}

// write each units diagnostics into its own directory
// layout becomes components/<component-id>/<unit-id>/<filename>
_, err = zw.Create("components/")
if err != nil {
return closeHandlers(err, zw, f)
}
for dirName, units := range compDirs {
_, err = zw.Create(fmt.Sprintf("components/%s/", dirName))
if err != nil {
return closeHandlers(err, zw, f)
}
for _, ud := range units {
unitDir := strings.ReplaceAll(strings.TrimPrefix(ud.UnitID, ud.ComponentID+"-"), "/", "-")
_, err = zw.Create(fmt.Sprintf("components/%s/%s/", dirName, unitDir))
if err != nil {
return closeHandlers(err, zw, f)
}
if ud.Err != nil {
w, err := zw.Create(fmt.Sprintf("components/%s/%s/error.txt", dirName, unitDir))
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = w.Write([]byte(fmt.Sprintf("%s\n", ud.Err)))
if err != nil {
return closeHandlers(err, zw, f)
}
continue
}
for _, fr := range ud.Results {
w, err := zw.Create(fmt.Sprintf("components/%s/%s/%s", dirName, unitDir, fr.Name))
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = w.Write(fr.Content)
if err != nil {
return closeHandlers(err, zw, f)
}
}
}
}

if err := zipLogs(zw); err != nil {
return closeHandlers(err, zw, f)
}

return closeHandlers(nil, zw, f)
}

// zipLogs walks paths.Logs() and copies the file structure into zw in "logs/"
func zipLogs(zw *zip.Writer) error {
_, err := zw.Create("logs/")
if err != nil {
return err
}

if err := collectServiceComponentsLogs(zw); err != nil {
return fmt.Errorf("failed to collect endpoint-security logs: %w", err)
}

// using Data() + "/logs", for some reason default paths/Logs() is the home dir...
logPath := filepath.Join(paths.Home(), "logs") + string(filepath.Separator)
return filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error {
if stderrors.Is(fErr, fs.ErrNotExist) {
return nil
}
if fErr != nil {
return fmt.Errorf("unable to walk log dir: %w", fErr)
}

// name is the relative dir/file name replacing any filepath seperators with /
// this will clean log names on windows machines and will nop on *nix
name := filepath.ToSlash(strings.TrimPrefix(path, logPath))
if name == "" {
return nil
}

if d.IsDir() {
_, err := zw.Create("logs/" + name + "/")
if err != nil {
return fmt.Errorf("unable to create log directory in archive: %w", err)
}
return nil
}

return saveLogs(name, path, zw)
})
}

func collectServiceComponentsLogs(zw *zip.Writer) error {
platform, err := component.LoadPlatformDetail()
if err != nil {
return fmt.Errorf("failed to gather system information: %w", err)
}
specs, err := component.LoadRuntimeSpecs(paths.Components(), platform)
if err != nil {
return fmt.Errorf("failed to detect inputs and outputs: %w", err)
}
for _, spec := range specs.ServiceSpecs() {
if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" {
// no log path set in specification
continue
}

logPath := filepath.Dir(spec.Spec.Service.Log.Path) + string(filepath.Separator)
err = filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error {
if fErr != nil {
if stderrors.Is(fErr, fs.ErrNotExist) {
return nil
}

return fmt.Errorf("unable to walk log directory %q for service input %s: %w", logPath, spec.InputType, fErr)
}

name := filepath.ToSlash(strings.TrimPrefix(path, logPath))
if name == "" {
return nil
}

if d.IsDir() {
return nil
}
defer f.Close()

return saveLogs("services/"+name, path, zw)
})
if err != nil {
return err
}
if err := diagnostics.ZipArchive(f, agentDiag, unitDiags); err != nil {
return fmt.Errorf("unable to create archive %q: %w", fileName, err)
}
fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName)
fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************")
return nil
}

func saveLogs(name string, logPath string, zw *zip.Writer) error {
lf, err := os.Open(logPath)
if err != nil {
return fmt.Errorf("unable to open log file: %w", err)
}
zf, err := zw.Create("logs/" + name)
if err != nil {
return closeHandlers(fmt.Errorf("unable to create log file in archive: %w", err), lf)
}
_, err = io.Copy(zf, lf)
if err != nil {
return closeHandlers(fmt.Errorf("log file copy failed: %w", err), lf)
}

return lf.Close()
}

// closeHandlers will close all passed closers attaching any errors to the passed err and returning the result
func closeHandlers(err error, closers ...io.Closer) error {
var mErr *multierror.Error
mErr = multierror.Append(mErr, err)
for _, c := range closers {
if inErr := c.Close(); inErr != nil {
mErr = multierror.Append(mErr, inErr)
}
}
return mErr.ErrorOrNil()
}
6 changes: 2 additions & 4 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
_ = serverStopFn()
}()

diagHooks := diagnostics.GlobalHooks()
diagHooks = append(diagHooks, coord.DiagnosticHooks()...)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagHooks)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagnostics.GlobalHooks, coord.DiagnosticHooks())
// start the control listener
if err := control.Start(); err != nil {
return err
Expand Down Expand Up @@ -395,7 +393,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig)

cfg := mcfg.APM

// nolint:godox // the TODO is intentional
//nolint:godox // the TODO is intentional
// TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions()
// but it doesn't exist today. Update this code once we have something
// available via the APM Go agent.
Expand Down
Loading

0 comments on commit e3ecfab

Please sign in to comment.