Skip to content

Commit

Permalink
Fix PR and use control client instead of coord in handler
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Nov 16, 2022
1 parent b70f7c3 commit 394db46
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Uploader interface {
// When a Diagnostics action is received a full diagnostics bundle is taken and uploaded to fleet-server.
type Diagnostics struct {
log *logger.Logger
coord *coordinator.Coordinator // TODO use of coordinator or control server/client?
client client.Client
uploader Uploader
}

// NewDiagnostics returns a new Diagnostics handler.
func NewDiagnostics(log *logger.Logger, coord *coordinator.Coordinator, uploader Uploader) *Diagnostics {
return &Diagnostics{
log: log,
coord: coord,
client: client.New(),
uploader: uploader,
}
}
Expand All @@ -48,49 +48,17 @@ func (h *Diagnostics) Handle(ctx context.Context, a fleetapi.Action, ack acker.A
}

// Gather agent diagnostics
diagHooks := append(diagnostics.GlobalHooks(), h.coord.DiagnosticHooks()()...)
aDiag := make([]client.DiagnosticFileResult, 0, len(diagHooks))
for _, hook := range diagHooks {
if ctx.Err() != nil {
return ctx.Err()
}

p, ts := hook.Hook(ctx)
aDiag = append(aDiag, client.DiagnosticFileResult{
Name: hook.Name,
Filename: hook.Filename,
Description: hook.Description,
ContentType: hook.ContentType,
Content: p,
Generated: ts,
})
aDiag, err := h.client.DiagnosticAgent(ctx)
if err != nil {
return fmt.Errorf("unable to gather agent diagnostics: %w", err)
}

runtimeDiag := h.coord.PerformDiagnostics(ctx)
uDiag := make([]client.DiagnosticUnitResult, 0, len(runtimeDiag))
for _, diag := range runtimeDiag {
files := make([]client.DiagnosticFileResult, 0, diag.Results)
for _, f := range diag.Results {
files = append(files, client.DiagnosticFileResult{
Name: f.Name,
Filename: f.Filename,
Description: f.Description,
ContentType: f.ContentType,
Content: f.Content,
Generated: f.Generated.AsTime(),
})
}
uDiag = append(uDiag, client.DiagnosticUnitResult{
ComponentID: diag.Component.ID,
UnitID: diag.Unit.ID,
UnitType: diag.Unit.Type,
Err: diag.Err,
Results: files,
})
uDiag, err := h.client.DiagnosticUnits(ctx)
if err != nil {
return fmt.Errorf("unable to gather unit diagnostics: %w", err)
}

var b bytes.Buffer
err := diagnostics.ZipArchive(b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments
err = diagnostics.ZipArchive(&b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments
if err != nil {
return fmt.Errorf("error creating diagnostics bundle: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"go.elastic.co/apm"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
Expand Down Expand Up @@ -473,6 +474,10 @@ func (c *Coordinator) DiagnosticHooks() func() diagnostics.Hooks {
},
},
}
hooks = append(hooks, c.addLogHooks()...)
hooks = append(hooks, c.addServiceLogHooks()...)

return hooks
}
}

Expand Down Expand Up @@ -542,7 +547,7 @@ func (c *Coordinator) addServiceLogHooks() []diagnostics.Hook {
Name: "services log dir",
Filename: "services/",
ContentType: diagnostics.ContentTypeDirectory,
Hook: func(_ context.Context) []byte { return nil },
Hook: func(_ context.Context) ([]byte, time.Time) { return nil, time.Time{} },
}}
for _, spec := range c.specs.ServiceSpecs() {
if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" {
Expand Down
205 changes: 6 additions & 199 deletions internal/pkg/agent/cmd/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@
package cmd

import (
"archive/zip"
"context"
stderrors "errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/client"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
)

func newDiagnosticsCommand(_ []string, streams *cli.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -74,201 +66,16 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
return fmt.Errorf("failed to fetch component/unit diagnostics: %w", err)
}

err = createZip(fileName, agentDiag, unitDiags)
if err != nil {
return fmt.Errorf("unable to create archive %q: %w", fileName, err)
}
fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName)
fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************")
return nil
}

// createZip creates a zip archive with the passed fileName.
//
// The passed DiagnosticsInfo and AgentConfig data is written in the specified output format.
// Any local log files are collected and copied into the archive.
func createZip(fileName string, agentDiag []client.DiagnosticFileResult, unitDiags []client.DiagnosticUnitResult) error {
f, err := os.Create(fileName)
if err != nil {
return err
}
zw := zip.NewWriter(f)

// write all Elastic Agent diagnostics at the top level
for _, ad := range agentDiag {
zf, err := zw.Create(ad.Filename)
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = zf.Write(ad.Content)
if err != nil {
return closeHandlers(err, zw, f)
}
}

// structure each unit into its own component directory
compDirs := make(map[string][]client.DiagnosticUnitResult)
for _, ud := range unitDiags {
compDir := strings.ReplaceAll(ud.ComponentID, "/", "-")
compDirs[compDir] = append(compDirs[compDir], ud)
}

// write each units diagnostics into its own directory
// layout becomes components/<component-id>/<unit-id>/<filename>
_, err = zw.Create("components/")
if err != nil {
return closeHandlers(err, zw, f)
}
for dirName, units := range compDirs {
_, err = zw.Create(fmt.Sprintf("components/%s/", dirName))
if err != nil {
return closeHandlers(err, zw, f)
}
for _, ud := range units {
unitDir := strings.ReplaceAll(strings.TrimPrefix(ud.UnitID, ud.ComponentID+"-"), "/", "-")
_, err = zw.Create(fmt.Sprintf("components/%s/%s/", dirName, unitDir))
if err != nil {
return closeHandlers(err, zw, f)
}
if ud.Err != nil {
w, err := zw.Create(fmt.Sprintf("components/%s/%s/error.txt", dirName, unitDir))
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = w.Write([]byte(fmt.Sprintf("%s\n", ud.Err)))
if err != nil {
return closeHandlers(err, zw, f)
}
continue
}
for _, fr := range ud.Results {
w, err := zw.Create(fmt.Sprintf("components/%s/%s/%s", dirName, unitDir, fr.Name))
if err != nil {
return closeHandlers(err, zw, f)
}
_, err = w.Write(fr.Content)
if err != nil {
return closeHandlers(err, zw, f)
}
}
}
}

if err := zipLogs(zw); err != nil {
return closeHandlers(err, zw, f)
}

return closeHandlers(nil, zw, f)
}

// zipLogs walks paths.Logs() and copies the file structure into zw in "logs/"
func zipLogs(zw *zip.Writer) error {
_, err := zw.Create("logs/")
if err != nil {
return err
}

if err := collectServiceComponentsLogs(zw); err != nil {
return fmt.Errorf("failed to collect endpoint-security logs: %w", err)
}

// using Data() + "/logs", for some reason default paths/Logs() is the home dir...
logPath := filepath.Join(paths.Home(), "logs") + string(filepath.Separator)
return filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error {
if stderrors.Is(fErr, fs.ErrNotExist) {
return nil
}
if fErr != nil {
return fmt.Errorf("unable to walk log dir: %w", fErr)
}

// name is the relative dir/file name replacing any filepath seperators with /
// this will clean log names on windows machines and will nop on *nix
name := filepath.ToSlash(strings.TrimPrefix(path, logPath))
if name == "" {
return nil
}

if d.IsDir() {
_, err := zw.Create("logs/" + name + "/")
if err != nil {
return fmt.Errorf("unable to create log directory in archive: %w", err)
}
return nil
}

return saveLogs(name, path, zw)
})
}

func collectServiceComponentsLogs(zw *zip.Writer) error {
platform, err := component.LoadPlatformDetail()
if err != nil {
return fmt.Errorf("failed to gather system information: %w", err)
}
specs, err := component.LoadRuntimeSpecs(paths.Components(), platform)
if err != nil {
return fmt.Errorf("failed to detect inputs and outputs: %w", err)
}
for _, spec := range specs.ServiceSpecs() {
if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" {
// no log path set in specification
continue
}

logPath := filepath.Dir(spec.Spec.Service.Log.Path) + string(filepath.Separator)
err = filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error {
if fErr != nil {
if stderrors.Is(fErr, fs.ErrNotExist) {
return nil
}

return fmt.Errorf("unable to walk log directory %q for service input %s: %w", logPath, spec.InputType, fErr)
}

name := filepath.ToSlash(strings.TrimPrefix(path, logPath))
if name == "" {
return nil
}

if d.IsDir() {
return nil
}
defer f.Close()

return saveLogs("services/"+name, path, zw)
})
if err != nil {
return err
}
if err := diagnostics.ZipArchive(f, agentDiag, unitDiags); err != nil {
return fmt.Errorf("unable to create archive %q: %w", fileName, err)
}
fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName)
fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************")
return nil
}

func saveLogs(name string, logPath string, zw *zip.Writer) error {
lf, err := os.Open(logPath)
if err != nil {
return fmt.Errorf("unable to open log file: %w", err)
}
zf, err := zw.Create("logs/" + name)
if err != nil {
return closeHandlers(fmt.Errorf("unable to create log file in archive: %w", err), lf)
}
_, err = io.Copy(zf, lf)
if err != nil {
return closeHandlers(fmt.Errorf("log file copy failed: %w", err), lf)
}

return lf.Close()
}

// closeHandlers will close all passed closers attaching any errors to the passed err and returning the result
func closeHandlers(err error, closers ...io.Closer) error {
var mErr *multierror.Error
mErr = multierror.Append(mErr, err)
for _, c := range closers {
if inErr := c.Close(); inErr != nil {
mErr = multierror.Append(mErr, inErr)
}
}
return mErr.ErrorOrNil()
}
6 changes: 2 additions & 4 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
_ = serverStopFn()
}()

diagHooks := diagnostics.GlobalHooks()
diagHooks = append(diagHooks, coord.DiagnosticHooks()...)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagHooks)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagnostics.GlobalHooks, coord.DiagnosticHooks())
// start the control listener
if err := control.Start(); err != nil {
return err
Expand Down Expand Up @@ -395,7 +393,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig)

cfg := mcfg.APM

// nolint:godox // the TODO is intentional
//nolint:godox // the TODO is intentional
// TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions()
// but it doesn't exist today. Update this code once we have something
// available via the APM Go agent.
Expand Down
Loading

0 comments on commit 394db46

Please sign in to comment.