From 394db4633baa6bdcf46a254653121bf6651f06b4 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 16 Nov 2022 13:49:50 -0800 Subject: [PATCH] Fix PR and use control client instead of coord in handler --- .../handlers/handler_action_diagnostics.go | 50 +---- .../application/coordinator/coordinator.go | 9 +- internal/pkg/agent/cmd/diagnostics.go | 205 +----------------- internal/pkg/agent/cmd/run.go | 6 +- internal/pkg/agent/control/server/server.go | 35 +-- internal/pkg/diagnostics/diagnostics.go | 5 +- internal/pkg/fleetapi/action.go | 10 + 7 files changed, 58 insertions(+), 262 deletions(-) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_diagnostics.go b/internal/pkg/agent/application/actions/handlers/handler_action_diagnostics.go index f5158892752..47f70c3e2d0 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_diagnostics.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_diagnostics.go @@ -26,7 +26,7 @@ type Uploader interface { // When a Diagnostics action is received a full diagnostics bundle is taken and uploaded to fleet-server. type Diagnostics struct { log *logger.Logger - coord *coordinator.Coordinator // TODO use of coordinator or control server/client? + client client.Client uploader Uploader } @@ -34,7 +34,7 @@ type Diagnostics struct { func NewDiagnostics(log *logger.Logger, coord *coordinator.Coordinator, uploader Uploader) *Diagnostics { return &Diagnostics{ log: log, - coord: coord, + client: client.New(), uploader: uploader, } } @@ -48,49 +48,17 @@ func (h *Diagnostics) Handle(ctx context.Context, a fleetapi.Action, ack acker.A } // Gather agent diagnostics - diagHooks := append(diagnostics.GlobalHooks(), h.coord.DiagnosticHooks()()...) - aDiag := make([]client.DiagnosticFileResult, 0, len(diagHooks)) - for _, hook := range diagHooks { - if ctx.Err() != nil { - return ctx.Err() - } - - p, ts := hook.Hook(ctx) - aDiag = append(aDiag, client.DiagnosticFileResult{ - Name: hook.Name, - Filename: hook.Filename, - Description: hook.Description, - ContentType: hook.ContentType, - Content: p, - Generated: ts, - }) + aDiag, err := h.client.DiagnosticAgent(ctx) + if err != nil { + return fmt.Errorf("unable to gather agent diagnostics: %w", err) } - - runtimeDiag := h.coord.PerformDiagnostics(ctx) - uDiag := make([]client.DiagnosticUnitResult, 0, len(runtimeDiag)) - for _, diag := range runtimeDiag { - files := make([]client.DiagnosticFileResult, 0, diag.Results) - for _, f := range diag.Results { - files = append(files, client.DiagnosticFileResult{ - Name: f.Name, - Filename: f.Filename, - Description: f.Description, - ContentType: f.ContentType, - Content: f.Content, - Generated: f.Generated.AsTime(), - }) - } - uDiag = append(uDiag, client.DiagnosticUnitResult{ - ComponentID: diag.Component.ID, - UnitID: diag.Unit.ID, - UnitType: diag.Unit.Type, - Err: diag.Err, - Results: files, - }) + uDiag, err := h.client.DiagnosticUnits(ctx) + if err != nil { + return fmt.Errorf("unable to gather unit diagnostics: %w", err) } var b bytes.Buffer - err := diagnostics.ZipArchive(b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments + err = diagnostics.ZipArchive(&b, aDiag, uDiag) // TODO Do we want to pass a buffer/a reader around? or write the file to a temp dir and read (to avoid memory usage)? file usage may need more thought for containerized deployments if err != nil { return fmt.Errorf("error creating diagnostics bundle: %w", err) } diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 8302dc39406..7d897a7c676 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -17,9 +17,10 @@ import ( "gopkg.in/yaml.v2" - "github.com/elastic/elastic-agent-client/v7/pkg/client" "go.elastic.co/apm" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" @@ -473,6 +474,10 @@ func (c *Coordinator) DiagnosticHooks() func() diagnostics.Hooks { }, }, } + hooks = append(hooks, c.addLogHooks()...) + hooks = append(hooks, c.addServiceLogHooks()...) + + return hooks } } @@ -542,7 +547,7 @@ func (c *Coordinator) addServiceLogHooks() []diagnostics.Hook { Name: "services log dir", Filename: "services/", ContentType: diagnostics.ContentTypeDirectory, - Hook: func(_ context.Context) []byte { return nil }, + Hook: func(_ context.Context) ([]byte, time.Time) { return nil, time.Time{} }, }} for _, spec := range c.specs.ServiceSpecs() { if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" { diff --git a/internal/pkg/agent/cmd/diagnostics.go b/internal/pkg/agent/cmd/diagnostics.go index 9fab842375e..2d548849d15 100644 --- a/internal/pkg/agent/cmd/diagnostics.go +++ b/internal/pkg/agent/cmd/diagnostics.go @@ -5,24 +5,16 @@ package cmd import ( - "archive/zip" "context" - stderrors "errors" "fmt" - "io" - "io/fs" "os" - "path/filepath" - "strings" "time" - "github.com/hashicorp/go-multierror" "github.com/spf13/cobra" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/control/client" "github.com/elastic/elastic-agent/internal/pkg/cli" - "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/internal/pkg/diagnostics" ) func newDiagnosticsCommand(_ []string, streams *cli.IOStreams) *cobra.Command { @@ -74,201 +66,16 @@ func diagnosticCmd(streams *cli.IOStreams, cmd *cobra.Command) error { return fmt.Errorf("failed to fetch component/unit diagnostics: %w", err) } - err = createZip(fileName, agentDiag, unitDiags) - if err != nil { - return fmt.Errorf("unable to create archive %q: %w", fileName, err) - } - fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName) - fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************") - return nil -} - -// createZip creates a zip archive with the passed fileName. -// -// The passed DiagnosticsInfo and AgentConfig data is written in the specified output format. -// Any local log files are collected and copied into the archive. -func createZip(fileName string, agentDiag []client.DiagnosticFileResult, unitDiags []client.DiagnosticUnitResult) error { f, err := os.Create(fileName) if err != nil { return err } - zw := zip.NewWriter(f) - - // write all Elastic Agent diagnostics at the top level - for _, ad := range agentDiag { - zf, err := zw.Create(ad.Filename) - if err != nil { - return closeHandlers(err, zw, f) - } - _, err = zf.Write(ad.Content) - if err != nil { - return closeHandlers(err, zw, f) - } - } - - // structure each unit into its own component directory - compDirs := make(map[string][]client.DiagnosticUnitResult) - for _, ud := range unitDiags { - compDir := strings.ReplaceAll(ud.ComponentID, "/", "-") - compDirs[compDir] = append(compDirs[compDir], ud) - } - - // write each units diagnostics into its own directory - // layout becomes components/// - _, err = zw.Create("components/") - if err != nil { - return closeHandlers(err, zw, f) - } - for dirName, units := range compDirs { - _, err = zw.Create(fmt.Sprintf("components/%s/", dirName)) - if err != nil { - return closeHandlers(err, zw, f) - } - for _, ud := range units { - unitDir := strings.ReplaceAll(strings.TrimPrefix(ud.UnitID, ud.ComponentID+"-"), "/", "-") - _, err = zw.Create(fmt.Sprintf("components/%s/%s/", dirName, unitDir)) - if err != nil { - return closeHandlers(err, zw, f) - } - if ud.Err != nil { - w, err := zw.Create(fmt.Sprintf("components/%s/%s/error.txt", dirName, unitDir)) - if err != nil { - return closeHandlers(err, zw, f) - } - _, err = w.Write([]byte(fmt.Sprintf("%s\n", ud.Err))) - if err != nil { - return closeHandlers(err, zw, f) - } - continue - } - for _, fr := range ud.Results { - w, err := zw.Create(fmt.Sprintf("components/%s/%s/%s", dirName, unitDir, fr.Name)) - if err != nil { - return closeHandlers(err, zw, f) - } - _, err = w.Write(fr.Content) - if err != nil { - return closeHandlers(err, zw, f) - } - } - } - } - - if err := zipLogs(zw); err != nil { - return closeHandlers(err, zw, f) - } - - return closeHandlers(nil, zw, f) -} - -// zipLogs walks paths.Logs() and copies the file structure into zw in "logs/" -func zipLogs(zw *zip.Writer) error { - _, err := zw.Create("logs/") - if err != nil { - return err - } - - if err := collectServiceComponentsLogs(zw); err != nil { - return fmt.Errorf("failed to collect endpoint-security logs: %w", err) - } - - // using Data() + "/logs", for some reason default paths/Logs() is the home dir... - logPath := filepath.Join(paths.Home(), "logs") + string(filepath.Separator) - return filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error { - if stderrors.Is(fErr, fs.ErrNotExist) { - return nil - } - if fErr != nil { - return fmt.Errorf("unable to walk log dir: %w", fErr) - } - - // name is the relative dir/file name replacing any filepath seperators with / - // this will clean log names on windows machines and will nop on *nix - name := filepath.ToSlash(strings.TrimPrefix(path, logPath)) - if name == "" { - return nil - } - - if d.IsDir() { - _, err := zw.Create("logs/" + name + "/") - if err != nil { - return fmt.Errorf("unable to create log directory in archive: %w", err) - } - return nil - } - - return saveLogs(name, path, zw) - }) -} - -func collectServiceComponentsLogs(zw *zip.Writer) error { - platform, err := component.LoadPlatformDetail() - if err != nil { - return fmt.Errorf("failed to gather system information: %w", err) - } - specs, err := component.LoadRuntimeSpecs(paths.Components(), platform) - if err != nil { - return fmt.Errorf("failed to detect inputs and outputs: %w", err) - } - for _, spec := range specs.ServiceSpecs() { - if spec.Spec.Service.Log == nil || spec.Spec.Service.Log.Path == "" { - // no log path set in specification - continue - } - - logPath := filepath.Dir(spec.Spec.Service.Log.Path) + string(filepath.Separator) - err = filepath.WalkDir(logPath, func(path string, d fs.DirEntry, fErr error) error { - if fErr != nil { - if stderrors.Is(fErr, fs.ErrNotExist) { - return nil - } - - return fmt.Errorf("unable to walk log directory %q for service input %s: %w", logPath, spec.InputType, fErr) - } - - name := filepath.ToSlash(strings.TrimPrefix(path, logPath)) - if name == "" { - return nil - } - - if d.IsDir() { - return nil - } + defer f.Close() - return saveLogs("services/"+name, path, zw) - }) - if err != nil { - return err - } + if err := diagnostics.ZipArchive(f, agentDiag, unitDiags); err != nil { + return fmt.Errorf("unable to create archive %q: %w", fileName, err) } + fmt.Fprintf(streams.Out, "Created diagnostics archive %q\n", fileName) + fmt.Fprintln(streams.Out, "***** WARNING *****\nCreated archive may contain plain text credentials.\nEnsure that files in archive are redacted before sharing.\n*******************") return nil } - -func saveLogs(name string, logPath string, zw *zip.Writer) error { - lf, err := os.Open(logPath) - if err != nil { - return fmt.Errorf("unable to open log file: %w", err) - } - zf, err := zw.Create("logs/" + name) - if err != nil { - return closeHandlers(fmt.Errorf("unable to create log file in archive: %w", err), lf) - } - _, err = io.Copy(zf, lf) - if err != nil { - return closeHandlers(fmt.Errorf("log file copy failed: %w", err), lf) - } - - return lf.Close() -} - -// closeHandlers will close all passed closers attaching any errors to the passed err and returning the result -func closeHandlers(err error, closers ...io.Closer) error { - var mErr *multierror.Error - mErr = multierror.Append(mErr, err) - for _, c := range closers { - if inErr := c.Close(); inErr != nil { - mErr = multierror.Append(mErr, inErr) - } - } - return mErr.ErrorOrNil() -} diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index e6f9ec8d0f7..903d59fb21b 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -178,9 +178,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error { _ = serverStopFn() }() - diagHooks := diagnostics.GlobalHooks() - diagHooks = append(diagHooks, coord.DiagnosticHooks()...) - control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagHooks) + control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagnostics.GlobalHooks, coord.DiagnosticHooks()) // start the control listener if err := control.Start(); err != nil { return err @@ -395,7 +393,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) cfg := mcfg.APM - // nolint:godox // the TODO is intentional + //nolint:godox // the TODO is intentional // TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions() // but it doesn't exist today. Update this code once we have something // available via the APM Go agent. diff --git a/internal/pkg/agent/control/server/server.go b/internal/pkg/agent/control/server/server.go index 7e6b6799bbb..665dd6abea1 100644 --- a/internal/pkg/agent/control/server/server.go +++ b/internal/pkg/agent/control/server/server.go @@ -32,23 +32,23 @@ import ( type Server struct { cproto.UnimplementedElasticAgentControlServer - logger *logger.Logger - agentInfo *info.AgentInfo - coord *coordinator.Coordinator - listener net.Listener - server *grpc.Server - tracer *apm.Tracer - diagHooks diagnostics.Hooks + logger *logger.Logger + agentInfo *info.AgentInfo + coord *coordinator.Coordinator + listener net.Listener + server *grpc.Server + tracer *apm.Tracer + diagHooksFn []func() diagnostics.Hooks } // New creates a new control protocol server. -func New(log *logger.Logger, agentInfo *info.AgentInfo, coord *coordinator.Coordinator, tracer *apm.Tracer, diagHooks diagnostics.Hooks) *Server { +func New(log *logger.Logger, agentInfo *info.AgentInfo, coord *coordinator.Coordinator, tracer *apm.Tracer, diagHooksFn ...func() diagnostics.Hooks) *Server { return &Server{ - logger: log, - agentInfo: agentInfo, - coord: coord, - tracer: tracer, - diagHooks: diagHooks, + logger: log, + agentInfo: agentInfo, + coord: coord, + tracer: tracer, + diagHooksFn: diagHooksFn, } } @@ -180,8 +180,13 @@ func (s *Server) Upgrade(ctx context.Context, request *cproto.UpgradeRequest) (* // DiagnosticAgent returns diagnostic information for this running Elastic Agent. func (s *Server) DiagnosticAgent(ctx context.Context, _ *cproto.DiagnosticAgentRequest) (*cproto.DiagnosticAgentResponse, error) { - res := make([]*cproto.DiagnosticFileResult, 0, len(s.diagHooks)) - for _, h := range s.diagHooks { + diagHooks := make([]diagnostics.Hook, 0) + for _, fn := range s.diagHooksFn { + hooks := fn() + diagHooks = append(diagHooks, hooks...) + } + res := make([]*cproto.DiagnosticFileResult, 0, len(diagHooks)) + for _, h := range diagHooks { if ctx.Err() != nil { return nil, ctx.Err() } diff --git a/internal/pkg/diagnostics/diagnostics.go b/internal/pkg/diagnostics/diagnostics.go index cb629496ef1..75bcbd9232e 100644 --- a/internal/pkg/diagnostics/diagnostics.go +++ b/internal/pkg/diagnostics/diagnostics.go @@ -20,6 +20,9 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/release" ) +// ContentTypeDirectory should be used to indicate that a directory should be made in the resulting bundle +const ContentTypeDirectory = "directory" + // Hook is a hook that gets used when diagnostic information is requested from the Elastic Agent. type Hook struct { Name string @@ -107,7 +110,7 @@ func pprofDiag(name string) func(context.Context) ([]byte, time.Time) { } // ZipArchive creates a zipped diagnostics bundle using the passed writer with the passed diagnostics. -// If any error is encounted when writing the contents of the archive it is returned. +// If any error is encountered when writing the contents of the archive it is returned. func ZipArchive(w io.Writer, agentDiag []client.DiagnosticFileResult, unitDiags []client.DiagnosticUnitResult) error { zw := zip.NewWriter(w) defer zw.Close() diff --git a/internal/pkg/fleetapi/action.go b/internal/pkg/fleetapi/action.go index 7de4a4b7d40..45cd2958b36 100644 --- a/internal/pkg/fleetapi/action.go +++ b/internal/pkg/fleetapi/action.go @@ -582,6 +582,16 @@ func (a *Actions) UnmarshalYAML(unmarshal func(interface{}) error) error { "fail to decode CANCEL_ACTION action", errors.TypeConfig) } + case ActionTypeDiagnostics: + action = &ActionDiagnostics{ + ActionID: n.ActionID, + ActionType: n.ActionType, + } + if err := json.Unmarshal(n.Data, action); err != nil { + return errors.New(err, + "fail to decode DIAGNOSTICS_ACTION action", + errors.TypeConfig) + } default: action = &ActionUnknown{ ActionID: n.ActionID,