From 2cc23382db4c89e605faf057b651a11cfe567638 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 2 Aug 2022 12:20:27 -0400 Subject: [PATCH] [v2] Fix inspect command (#805) * Write the inspect command for v2. * Fix lint. * Fix code review. Load inputs from inputs.d for inspect. * Fix lint. * Refactor to use errgroup. * Remove unused struct. --- internal/pkg/agent/application/application.go | 36 +- internal/pkg/agent/application/once.go | 6 +- .../pkg/agent/application/paths/common.go | 10 +- internal/pkg/agent/application/periodic.go | 6 +- internal/pkg/agent/cmd/diagnostics.go | 12 +- internal/pkg/agent/cmd/inspect.go | 538 ++++++++++-------- internal/pkg/agent/cmd/inspect_test.go | 53 -- internal/pkg/agent/configuration/settings.go | 5 - internal/pkg/agent/install/uninstall.go | 2 +- internal/pkg/composable/controller.go | 15 + internal/pkg/config/discover.go | 39 ++ internal/pkg/config/operations/inspector.go | 18 +- pkg/component/component.go | 17 +- pkg/component/error.go | 32 ++ pkg/component/load.go | 14 +- 15 files changed, 437 insertions(+), 366 deletions(-) delete mode 100644 internal/pkg/agent/cmd/inspect_test.go create mode 100644 internal/pkg/config/discover.go create mode 100644 pkg/component/error.go diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index bf0d0fd6444..6b3c4b73d42 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -6,7 +6,6 @@ package application import ( "fmt" - "path/filepath" "go.elastic.co/apm" @@ -20,17 +19,11 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" - "github.com/elastic/elastic-agent/internal/pkg/dir" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger" ) -type discoverFunc func() ([]string, error) - -// ErrNoConfiguration is returned when no configuration are found. -var ErrNoConfiguration = errors.New("no configuration found", errors.TypeConfig) - // New creates a new Agent and bootstrap the required subsystem. func New( log *logger.Logger, @@ -83,8 +76,8 @@ func New( if configuration.IsStandalone(cfg.Fleet) { log.Info("Parsed configuration and determined agent is managed locally") - loader := config.NewLoader(log, externalConfigsGlob()) - discover := discoverer(pathConfigFile, cfg.Settings.Path, externalConfigsGlob()) + loader := config.NewLoader(log, paths.ExternalInputs()) + discover := config.Discoverer(pathConfigFile, cfg.Settings.Path, paths.ExternalInputs()) if !cfg.Settings.Reload.Enabled { log.Debug("Reloading of configuration is off") configMgr = newOnce(log, discover, loader) @@ -173,28 +166,3 @@ func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.C return store, cfg, nil } - -func externalConfigsGlob() string { - return filepath.Join(paths.Config(), configuration.ExternalInputsPattern) -} - -func discoverer(patterns ...string) discoverFunc { - p := make([]string, 0, len(patterns)) - for _, newP := range patterns { - if len(newP) == 0 { - continue - } - - p = append(p, newP) - } - - if len(p) == 0 { - return func() ([]string, error) { - return []string{}, ErrNoConfiguration - } - } - - return func() ([]string, error) { - return dir.DiscoverFiles(p...) - } -} diff --git a/internal/pkg/agent/application/once.go b/internal/pkg/agent/application/once.go index 7326612950b..fca0ed3e741 100644 --- a/internal/pkg/agent/application/once.go +++ b/internal/pkg/agent/application/once.go @@ -17,13 +17,13 @@ import ( type once struct { log *logger.Logger - discover discoverFunc + discover config.DiscoverFunc loader *config.Loader ch chan coordinator.ConfigChange errCh chan error } -func newOnce(log *logger.Logger, discover discoverFunc, loader *config.Loader) *once { +func newOnce(log *logger.Logger, discover config.DiscoverFunc, loader *config.Loader) *once { return &once{log: log, discover: discover, loader: loader, ch: make(chan coordinator.ConfigChange), errCh: make(chan error)} } @@ -34,7 +34,7 @@ func (o *once) Run(ctx context.Context) error { } if len(files) == 0 { - return ErrNoConfiguration + return config.ErrNoConfiguration } cfg, err := readfiles(files, o.loader) diff --git a/internal/pkg/agent/application/paths/common.go b/internal/pkg/agent/application/paths/common.go index 79b114144cc..8b6cc06743e 100644 --- a/internal/pkg/agent/application/paths/common.go +++ b/internal/pkg/agent/application/paths/common.go @@ -23,6 +23,9 @@ const ( tempSubdir = "tmp" ) +// ExternalInputsPattern is a glob that matches the paths of external configuration files. +var ExternalInputsPattern = filepath.Join("inputs.d", "*.yml") + var ( topPath string configPath string @@ -69,7 +72,7 @@ func TempDir() string { tmpDir := filepath.Join(Data(), tempSubdir) tmpCreator.Do(func() { // create tempdir as it probably don't exists - os.MkdirAll(tmpDir, 0750) + _ = os.MkdirAll(tmpDir, 0750) }) return tmpDir } @@ -119,6 +122,11 @@ func ConfigFile() string { return filepath.Join(Config(), configFilePath) } +// ExternalInputs returns the path to load external inputs from. +func ExternalInputs() string { + return filepath.Join(Config(), ExternalInputsPattern) +} + // Data returns the data directory for Agent func Data() string { if unversionedHome { diff --git a/internal/pkg/agent/application/periodic.go b/internal/pkg/agent/application/periodic.go index bb9f717a7af..e32234a4ca3 100644 --- a/internal/pkg/agent/application/periodic.go +++ b/internal/pkg/agent/application/periodic.go @@ -22,7 +22,7 @@ type periodic struct { period time.Duration watcher *filewatcher.Watch loader *config.Loader - discover discoverFunc + discover config.DiscoverFunc ch chan coordinator.ConfigChange errCh chan error } @@ -62,7 +62,7 @@ func (p *periodic) work() error { } if len(files) == 0 { - return ErrNoConfiguration + return config.ErrNoConfiguration } // Reset the state of the watched files @@ -115,7 +115,7 @@ func (p *periodic) work() error { func newPeriodic( log *logger.Logger, period time.Duration, - discover discoverFunc, + discover config.DiscoverFunc, loader *config.Loader, ) *periodic { w, err := filewatcher.New(log, filewatcher.DefaultComparer) diff --git a/internal/pkg/agent/cmd/diagnostics.go b/internal/pkg/agent/cmd/diagnostics.go index 811b88465d2..f267c2df162 100644 --- a/internal/pkg/agent/cmd/diagnostics.go +++ b/internal/pkg/agent/cmd/diagnostics.go @@ -398,6 +398,11 @@ func outputDiagnostics(w io.Writer, d DiagnosticsInfo) error { } func gatherConfig() (AgentConfig, error) { + log, err := newErrorLogger() + if err != nil { + return AgentConfig{}, err + } + cfg := AgentConfig{} localCFG, err := loadConfig(nil) if err != nil { @@ -405,7 +410,7 @@ func gatherConfig() (AgentConfig, error) { } cfg.ConfigLocal = localCFG - renderedCFG, err := operations.LoadFullAgentConfig(paths.ConfigFile(), true) + renderedCFG, err := operations.LoadFullAgentConfig(log, paths.ConfigFile(), true) if err != nil { return cfg, err } @@ -434,11 +439,6 @@ func gatherConfig() (AgentConfig, error) { return AgentConfig{}, err } - log, err := newErrorLogger() - if err != nil { - return AgentConfig{}, err - } - // Get process config - uses same approach as inspect output command. // Does not contact server process to request configs. pMap, err := getProgramsFromConfig(log, agentInfo, renderedCFG, isStandalone) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index 0c51bb40460..32455a179c4 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -5,383 +5,437 @@ package cmd import ( + "context" + "fmt" + "os" + "strings" + "time" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/service" - "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/cli" + "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/internal/pkg/config/operations" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" ) func newInspectCommandWithArgs(s []string, streams *cli.IOStreams) *cobra.Command { cmd := &cobra.Command{ Use: "inspect", Short: "Shows configuration of the agent", - Long: "Shows current configuration of the agent", - Args: cobra.ExactArgs(0), + Long: `Shows current configuration of the agent. + +By default variable substitution is not performed. Use the --variables flag to enable variable substitution. The +first set of computed variables are used when only the --variables flag is defined. This can prevent some of the +dynamic providers (kubernetes, docker, etc.) from providing all the possible variables it could have discovered if given +more time. The --variables-wait allows an amount of time to be provided for variable discovery, when set it will +wait that amount of time before using the variables for the configuration. +`, + Args: cobra.ExactArgs(0), Run: func(c *cobra.Command, args []string) { - // TODO(blakerouse): Fix inspect command for Elastic Agent v2 - /* - if err := inspectConfig(paths.ConfigFile()); err != nil { - fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage()) - os.Exit(1) - } - */ + var opts inspectConfigOpts + opts.variables, _ = c.Flags().GetBool("variables") + opts.variablesWait, _ = c.Flags().GetDuration("variables-wait") + + ctx, cancel := context.WithCancel(context.Background()) + service.HandleSignals(func() {}, cancel) + if err := inspectConfig(ctx, paths.ConfigFile(), opts, streams); err != nil { + fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage()) + os.Exit(1) + } }, } - cmd.AddCommand(newInspectOutputCommandWithArgs(s)) + cmd.Flags().Bool("variables", false, "render configuration with variables substituted") + cmd.Flags().Duration("variables-wait", time.Duration(0), "wait this amount of time for variables before performing substitution") + + cmd.AddCommand(newInspectComponentsCommandWithArgs(s, streams)) return cmd } -func newInspectOutputCommandWithArgs(_ []string) *cobra.Command { +func newInspectComponentsCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { cmd := &cobra.Command{ - Use: "output", - Short: "Displays configuration generated for output", - Long: "Displays configuration generated for output.\nIf no output is specified list of output is displayed", - Args: cobra.MaximumNArgs(2), - RunE: func(c *cobra.Command, args []string) error { - // TODO(blakerouse): Fix inspect command for Elastic Agent v2 - /* - outName, _ := c.Flags().GetString("output") - program, _ := c.Flags().GetString("program") - cfgPath := paths.ConfigFile() - agentInfo, err := info.NewAgentInfo(false) - if err != nil { - return err - } + Use: "components [id]", + Short: "Displays the components model for the configuration", + Long: `Displays the generated components model for the current configuration. + +By default the configuration for each unit inside of a component is not returned. Use --show-config to display the +configuration in all the units. + +A specific component can be selected by its ID and only that component and all its units will be returned. Because its +possible for a component to have many units the configuration for each unit is still not provided by default. Use +--show-config to display the configuration in all the units. + +A specific unit inside of a component can be selected by using / and only that unit will be +returned. In this mode the configuration is provided by default, using the --show-config is a noop. - if outName == "" { - return inspectOutputs(cfgPath, agentInfo) - } +The selected input or output runtime specification for a component is never provided unless enabled with --show-spec. - return inspectOutput(cfgPath, outName, program, agentInfo) - */ - return nil +Variable substitution is always performed when computing the components, and it cannot be disabled. By default only the +first set of computed variables are used. This can prevent some of the dynamic providers (kubernetes, docker, etc.) from +providing all the possible variables it could have discovered if given more time. The --variables-wait allows an +amount of time to be provided for variable discovery, when set it will wait that amount of time before using the +variables for the configuration. +`, + Args: cobra.MaximumNArgs(1), + Run: func(c *cobra.Command, args []string) { + var opts inspectComponentsOpts + if len(args) > 0 { + opts.id = args[0] + } + opts.showConfig, _ = c.Flags().GetBool("show-config") + opts.showSpec, _ = c.Flags().GetBool("show-spec") + opts.variablesWait, _ = c.Flags().GetDuration("variables-wait") + + ctx, cancel := context.WithCancel(context.Background()) + service.HandleSignals(func() {}, cancel) + if err := inspectComponents(ctx, paths.ConfigFile(), opts, streams); err != nil { + fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage()) + os.Exit(1) + } }, } - cmd.Flags().StringP("output", "o", "", "name of the output to be inspected") - cmd.Flags().StringP("program", "p", "", "type of program to inspect, needs to be combined with output. e.g filebeat") + cmd.Flags().Bool("show-config", false, "show the configuration for all units") + cmd.Flags().Bool("show-spec", false, "show the runtime specification for a component") + cmd.Flags().Duration("variables-wait", time.Duration(0), "wait this amount of time for variables before performing substitution") return cmd } -/* -func inspectConfig(cfgPath string) error { - err := tryContainerLoadPaths() - if err != nil { - return err - } +type inspectConfigOpts struct { + variables bool + variablesWait time.Duration +} - fullCfg, err := operations.LoadFullAgentConfig(cfgPath, true) +func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, streams *cli.IOStreams) error { + err := tryContainerLoadPaths() if err != nil { return err } - return printConfig(fullCfg) -} - -func printMapStringConfig(mapStr map[string]interface{}) error { l, err := newErrorLogger() if err != nil { return err } - caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l) + + fullCfg, err := operations.LoadFullAgentConfig(l, cfgPath, true) if err != nil { return err } - newCfg, err := caps.Apply(mapStr) - if err != nil { - return errors.New(err, "failed to apply capabilities") + if !opts.variables { + return printConfig(fullCfg, l, streams) } - newMap, ok := newCfg.(map[string]interface{}) - if !ok { - return errors.New("config returned from capabilities has invalid type") + cfg, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait) + if err != nil { + return err } + return printMapStringConfig(cfg, streams) +} - data, err := yaml.Marshal(newMap) +func printMapStringConfig(mapStr map[string]interface{}, streams *cli.IOStreams) error { + data, err := yaml.Marshal(mapStr) if err != nil { return errors.New(err, "could not marshal to YAML") } - _, err = os.Stdout.WriteString(string(data)) + _, err = streams.Out.Write(data) return err } -func printConfig(cfg *config.Config) error { - mapStr, err := cfg.ToMapStr() +func printConfig(cfg *config.Config, l *logger.Logger, streams *cli.IOStreams) error { + caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l) if err != nil { return err } - return printMapStringConfig(mapStr) -} - -func newErrorLogger() (*logger.Logger, error) { - return logger.NewWithLogpLevel("", logp.ErrorLevel, false) -} - -func inspectOutputs(cfgPath string, agentInfo *info.AgentInfo) error { - l, err := newErrorLogger() + mapStr, err := cfg.ToMapStr() if err != nil { return err } - - fullCfg, err := operations.LoadFullAgentConfig(cfgPath, true) + newCfg, err := caps.Apply(mapStr) if err != nil { - return err + return errors.New(err, "failed to apply capabilities") } - - fleetConfig, err := fullCfg.ToMapStr() - if err != nil { - return err + newMap, ok := newCfg.(map[string]interface{}) + if !ok { + return errors.New("config returned from capabilities has invalid type") } - isStandalone, err := isStandalone(fullCfg) - if err != nil { - return err - } + return printMapStringConfig(newMap, streams) +} - return listOutputsFromMap(l, agentInfo, fleetConfig, isStandalone) +type inspectComponentsOpts struct { + id string + showConfig bool + showSpec bool + variablesWait time.Duration } -func listOutputsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config, isStandalone bool) error { - programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg, isStandalone) +func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponentsOpts, streams *cli.IOStreams) error { + l, err := newErrorLogger() if err != nil { return err - - } - - for k := range programsGroup { - _, _ = os.Stdout.WriteString(k) } - return nil -} - -func listOutputsFromMap(log *logger.Logger, agentInfo *info.AgentInfo, cfg map[string]interface{}, isStandalone bool) error { - c, err := config.NewConfigFrom(cfg) + // Ensure that when running inside a container that the correct paths are used. + err = tryContainerLoadPaths() if err != nil { return err } - return listOutputsFromConfig(log, agentInfo, c, isStandalone) -} - -func inspectOutput(cfgPath, output, program string, agentInfo *info.AgentInfo) error { - l, err := newErrorLogger() + // Load the requirements before trying to load the configuration. These should always load + // even if the configuration is wrong. + platform, err := component.LoadPlatformDetail() if err != nil { - return err + return fmt.Errorf("failed to gather system information: %w", err) } - - fullCfg, err := operations.LoadFullAgentConfig(cfgPath, true) + specs, err := component.LoadRuntimeSpecs(paths.Components(), platform) if err != nil { - return err + return fmt.Errorf("failed to detect inputs and outputs: %w", err) } - fleetConfig, err := fullCfg.ToMapStr() + m, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait) if err != nil { return err } - return printOutputFromMap(l, agentInfo, output, program, fleetConfig, true) -} - -func printOutputFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg *config.Config, isStandalone bool) error { - programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg, isStandalone) + // Compute the components from the computed configuration. + comps, err := specs.ToComponents(m) if err != nil { - return err - + return fmt.Errorf("failed to render components: %w", err) } - for k, programs := range programsGroup { - if k != output { - continue + // ID provided. + if opts.id != "" { + splitID := strings.SplitN(opts.id, "/", 2) + compID := splitID[0] + unitID := "" + if len(splitID) > 1 { + unitID = splitID[1] } - - var programFound bool - for _, p := range programs { - if programName != "" && programName != p.Spec.CommandName() { - continue + comp, ok := findComponent(comps, compID) + if ok { + if unitID != "" { + unit, ok := findUnit(comp, unitID) + if ok { + return printUnit(unit, streams) + } + return fmt.Errorf("unable to find unit with ID: %s/%s", compID, unitID) } - - programFound = true - _, _ = os.Stdout.WriteString(fmt.Sprintf("[%s] %s:\n", k, p.Spec.CommandName())) - err = printMapStringConfig(p.Configuration()) - if err != nil { - return fmt.Errorf("cannot print configuration of program '%s': %w", programName, err) + if !opts.showSpec { + comp.Spec = component.InputRuntimeSpec{} } - _, _ = os.Stdout.WriteString("---") + if !opts.showConfig { + for key, unit := range comp.Units { + unit.Config = nil + comp.Units[key] = unit + } + } + return printComponent(comp, streams) } + return fmt.Errorf("unable to find component with ID: %s", compID) + } - if !programFound { - return fmt.Errorf("program '%s' is not recognized within output '%s', try running `elastic-agent inspect output` to find available outputs", - programName, - output) + // Hide configuration unless toggled on. + if !opts.showConfig { + for i, comp := range comps { + for key, unit := range comp.Units { + unit.Config = nil + comp.Units[key] = unit + } + comps[i] = comp } - - return nil } - return fmt.Errorf("output '%s' is not recognized, try running `elastic-agent inspect output` to find available outputs", output) - -} - -func printOutputFromMap(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg map[string]interface{}, isStandalone bool) error { - c, err := config.NewConfigFrom(cfg) - if err != nil { - return err + // Hide runtime specification unless toggled on. + if !opts.showSpec { + for i, comp := range comps { + comp.Spec = component.InputRuntimeSpec{} + comps[i] = comp + } } - return printOutputFromConfig(log, agentInfo, output, programName, c, isStandalone) + return printComponents(comps, streams) } -func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config, isStandalone bool) (map[string][]program.Program, error) { - monitor := noop.NewMonitor() - router := &inmemRouter{} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - composableCtrl, err := composable.New(log, cfg) +func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration) (map[string]interface{}, error) { + caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l) if err != nil { - return nil, err - } - - composableWaiter := newWaitForCompose(composableCtrl) - configModifiers := &pipeline.ConfigModifiers{ - Decorators: []pipeline.DecoratorFunc{modifiers.InjectMonitoring}, - Filters: []pipeline.FilterFunc{filters.StreamChecker}, - } - - if !isStandalone { - sysInfo, err := sysinfo.Host() - if err != nil { - return nil, errors.New(err, - "fail to get system information", - errors.TypeUnexpected) - } - configModifiers.Filters = append(configModifiers.Filters, modifiers.InjectFleet(cfg, sysInfo.Info(), agentInfo)) + return nil, fmt.Errorf("failed to determine capabilities: %w", err) } - caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log) + cfg, err := operations.LoadFullAgentConfig(l, cfgPath, true) if err != nil { return nil, err } - - emit, err := emitter.New( - ctx, - log, - agentInfo, - composableWaiter, - router, - configModifiers, - caps, - monitor, - ) + m, err := cfg.ToMapStr() if err != nil { return nil, err } - - if err := emit(ctx, cfg); err != nil { - return nil, err + ast, err := transpiler.NewAST(m) + if err != nil { + return nil, fmt.Errorf("could not create the AST from the configuration: %w", err) } - composableWaiter.Wait() - // add the fleet-server input to default programs list - // this does not correspond to the actual config that fleet-server uses as it's in fleet.yml and not part of the assembled config (cfg) - fleetCFG, err := cfg.ToMapStr() + var ok bool + updatedAst, err := caps.Apply(ast) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to apply capabilities: %w", err) } - if fleetInput := getFleetInput(fleetCFG); fleetInput != nil { - ast, err := transpiler.NewAST(fleetInput) - if err != nil { - return nil, err - } - router.programs["default"] = append(router.programs["default"], program.Program{ - Spec: program.Spec{ - Name: "fleet-server", - Cmd: "fleet-server", - }, - Config: ast, - }) + ast, ok = updatedAst.(*transpiler.AST) + if !ok { + return nil, fmt.Errorf("failed to transform object returned from capabilities to AST: %w", err) } - return router.programs, nil -} - -func getFleetInput(o map[string]interface{}) map[string]interface{} { - arr, ok := o["inputs"].([]interface{}) - if !ok { - return nil + // Wait for the variables based on the timeout. + vars, err := waitForVariables(ctx, l, cfg, timeout) + if err != nil { + return nil, fmt.Errorf("failed to gather variables: %w", err) } - for _, iface := range arr { - input, ok := iface.(map[string]interface{}) - if !ok { - continue - } - t, ok := input["type"] - if !ok { - continue + + // Render the inputs using the discovered inputs. + inputs, ok := transpiler.Lookup(ast, "inputs") + if ok { + renderedInputs, err := transpiler.RenderInputs(inputs, vars) + if err != nil { + return nil, fmt.Errorf("rendering inputs failed: %w", err) } - if t.(string) == "fleet-server" { - return input + err = transpiler.Insert(ast, renderedInputs, "inputs") + if err != nil { + return nil, fmt.Errorf("inserting rendered inputs failed: %w", err) } } - return nil + m, err = ast.Map() + if err != nil { + return nil, fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) + } + return m, nil } -type inmemRouter struct { - programs map[string][]program.Program -} +func waitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config, wait time.Duration) ([]*transpiler.Vars, error) { + var cancel context.CancelFunc + var vars []*transpiler.Vars -func (r *inmemRouter) Routes() *sorted.Set { - return nil -} + composable, err := composable.New(l, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create composable controller: %w", err) + } -func (r *inmemRouter) Route(_ context.Context, _ string, grpProg map[pipeline.RoutingKey][]program.Program) error { - r.programs = grpProg - return nil -} + hasTimeout := false + if wait > time.Duration(0) { + hasTimeout = true + ctx, cancel = context.WithTimeout(ctx, wait) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() -func (r *inmemRouter) Shutdown() {} + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + var err error + for { + select { + case <-ctx.Done(): + if err == nil { + err = ctx.Err() + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + err = nil + } + return err + case cErr := <-composable.Errors(): + err = cErr + if err != nil { + cancel() + } + case cVars := <-composable.Watch(): + vars = cVars + if !hasTimeout { + cancel() + } + } + } + }) -type waitForCompose struct { - controller composable.Controller - done chan bool -} + g.Go(func() error { + err := composable.Run(ctx) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + err = nil + } + return err + }) -func newWaitForCompose(wrapped composable.Controller) *waitForCompose { - return &waitForCompose{ - controller: wrapped, - done: make(chan bool), + err = g.Wait() + if err != nil { + return nil, err } + return vars, nil } -func (w *waitForCompose) Run(ctx context.Context) error { - err := w.controller.Run(ctx) +func printComponents(components []component.Component, streams *cli.IOStreams) error { + topLevel := struct { + Components []component.Component `yaml:"components"` + }{ + Components: components, + } + data, err := yaml.Marshal(topLevel) + if err != nil { + return errors.New(err, "could not marshal to YAML") + } + _, err = streams.Out.Write(data) return err } -func (w *waitForCompose) Errors() <-chan error { - return nil +func printComponent(comp component.Component, streams *cli.IOStreams) error { + data, err := yaml.Marshal(comp) + if err != nil { + return errors.New(err, "could not marshal to YAML") + } + _, err = streams.Out.Write(data) + return err } -func (w *waitForCompose) Watch() <-chan []*transpiler.Vars { - return nil +func printUnit(unit component.Unit, streams *cli.IOStreams) error { + data, err := yaml.Marshal(unit) + if err != nil { + return errors.New(err, "could not marshal to YAML") + } + _, err = streams.Out.Write(data) + return err } -func (w *waitForCompose) Wait() { - <-w.done +func findUnit(comp component.Component, id string) (component.Unit, bool) { + for _, unit := range comp.Units { + if unit.ID == id { + return unit, true + } + } + return component.Unit{}, false } -*/ -func isStandalone(cfg *config.Config) (bool, error) { - c, err := configuration.NewFromConfig(cfg) - if err != nil { - return false, err +func findComponent(components []component.Component, id string) (component.Component, bool) { + for _, comp := range components { + if comp.ID == id { + return comp, true + } } + return component.Component{}, false +} - return configuration.IsStandalone(c.Fleet), nil +func newErrorLogger() (*logger.Logger, error) { + return logger.NewWithLogpLevel("", logp.ErrorLevel, false) } diff --git a/internal/pkg/agent/cmd/inspect_test.go b/internal/pkg/agent/cmd/inspect_test.go deleted file mode 100644 index 3a5ffb35380..00000000000 --- a/internal/pkg/agent/cmd/inspect_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package cmd - -/* -import ( - "testing" -) - -func TestGetFleetInput(t *testing.T) { - tests := []struct { - name string - input map[string]interface{} - expect map[string]interface{} - }{{ - name: "fleet-server input found", - input: map[string]interface{}{ - "inputs": []map[string]interface{}{ - map[string]interface{}{ - "type": "fleet-server", - }}, - }, - expect: map[string]interface{}{ - "type": "fleet-server", - }, - }, { - name: "no fleet-server input", - input: map[string]interface{}{ - "inputs": []map[string]interface{}{ - map[string]interface{}{ - "type": "test-server", - }}, - }, - expect: nil, - }, { - name: "wrong input formant", - input: map[string]interface{}{ - "inputs": "example", - }, - expect: nil, - }} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := getFleetInput(tt.input) - if tt.expect == nil && r != nil { - t.Error("expected nil") - } - }) - } -} -*/ diff --git a/internal/pkg/agent/configuration/settings.go b/internal/pkg/agent/configuration/settings.go index eab16a8177d..0a211101c4d 100644 --- a/internal/pkg/agent/configuration/settings.go +++ b/internal/pkg/agent/configuration/settings.go @@ -5,8 +5,6 @@ package configuration import ( - "path/filepath" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" @@ -14,9 +12,6 @@ import ( "github.com/elastic/elastic-agent/pkg/core/process" ) -// ExternalInputsPattern is a glob that matches the paths of external configuration files. -var ExternalInputsPattern = filepath.Join("inputs.d", "*.yml") - // SettingsConfig is an collection of agent settings configuration. type SettingsConfig struct { DownloadConfig *artifact.Config `yaml:"download" config:"download" json:"download"` diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index d1ee5e371ff..61588f5de97 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -127,7 +127,7 @@ func uninstallComponents(ctx context.Context, cfgFile string) error { return fmt.Errorf("failed to detect inputs and outputs: %w", err) } - cfg, err := operations.LoadFullAgentConfig(cfgFile, false) + cfg, err := operations.LoadFullAgentConfig(log, cfgFile, false) if err != nil { return err } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 116424ae8e4..babd1230586 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -158,6 +158,21 @@ func (c *controller) Run(ctx context.Context) error { c.logger.Debugf("Stopping controller for composable inputs") t.Stop() cancel() + + // wait for all providers to stop (but its possible they still send notifications over notify + // channel, and we cannot block them sending) + emptyChan, emptyCancel := context.WithCancel(context.Background()) + defer emptyCancel() + go func() { + for { + select { + case <-emptyChan.Done(): + return + case <-notify: + } + } + }() + wg.Wait() return ctx.Err() case <-notify: diff --git a/internal/pkg/config/discover.go b/internal/pkg/config/discover.go new file mode 100644 index 00000000000..2408626fdaf --- /dev/null +++ b/internal/pkg/config/discover.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package config + +import ( + "errors" + + "github.com/elastic/elastic-agent/internal/pkg/dir" +) + +// ErrNoConfiguration is returned when no configuration are found. +var ErrNoConfiguration = errors.New("no configuration found") + +// DiscoverFunc is a function that discovers a list of files to load. +type DiscoverFunc func() ([]string, error) + +// Discoverer returns a DiscoverFunc that discovers all files that match the given patterns. +func Discoverer(patterns ...string) DiscoverFunc { + p := make([]string, 0, len(patterns)) + for _, newP := range patterns { + if len(newP) == 0 { + continue + } + + p = append(p, newP) + } + + if len(p) == 0 { + return func() ([]string, error) { + return []string{}, ErrNoConfiguration + } + } + + return func() ([]string, error) { + return dir.DiscoverFiles(p...) + } +} diff --git a/internal/pkg/config/operations/inspector.go b/internal/pkg/config/operations/inspector.go index 05ab040d92b..7feaa4e5ef6 100644 --- a/internal/pkg/config/operations/inspector.go +++ b/internal/pkg/config/operations/inspector.go @@ -26,7 +26,7 @@ var ( // LoadFullAgentConfig load agent config based on provided paths and defined capabilities. // In case fleet is used, config from policy action is returned. -func LoadFullAgentConfig(cfgPath string, failOnFleetMissing bool) (*config.Config, error) { +func LoadFullAgentConfig(logger *logger.Logger, cfgPath string, failOnFleetMissing bool) (*config.Config, error) { rawConfig, err := loadConfig(cfgPath) if err != nil { return nil, err @@ -38,7 +38,21 @@ func LoadFullAgentConfig(cfgPath string, failOnFleetMissing bool) (*config.Confi } if configuration.IsStandalone(cfg.Fleet) { - return rawConfig, nil + // When in standalone we load the configuration again with inputs that are defined in the paths.ExternalInputs. + loader := config.NewLoader(logger, paths.ExternalInputs()) + discover := config.Discoverer(cfgPath, cfg.Settings.Path, paths.ExternalInputs()) + files, err := discover() + if err != nil { + return nil, fmt.Errorf("could not discover configuration files: %w", err) + } + if len(files) == 0 { + return nil, config.ErrNoConfiguration + } + c, err := loader.Load(files) + if err != nil { + return nil, fmt.Errorf("failed to load or merge configuration: %w", err) + } + return c, nil } fleetConfig, err := loadFleetConfig() diff --git a/pkg/component/component.go b/pkg/component/component.go index db38bb38471..94c19c8535b 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -5,7 +5,6 @@ package component import ( - "errors" "fmt" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -16,7 +15,7 @@ import ( var ( // ErrOutputNotSupported is returned when an input does not support an output type - ErrOutputNotSupported = errors.New("input doesn't support output type") + ErrOutputNotSupported = newError("input doesn't support output type") ) // ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs. @@ -37,25 +36,25 @@ func (e *ErrInputRuntimeCheckFail) Error() string { // Unit is a single input or output that a component must run. type Unit struct { - ID string - Type client.UnitType - Config map[string]interface{} + ID string `yaml:"id"` + Type client.UnitType `yaml:"type"` + Config map[string]interface{} `yaml:"config,omitempty"` } // Component is a set of units that needs to run. type Component struct { // ID is the unique ID of the component. - ID string + ID string `yaml:"id"` // Err used when there is an error with running this input. Used by the runtime to alert // the reason that all of these units are failed. - Err error + Err error `yaml:"error,omitempty"` // Spec on how the input should run. - Spec InputRuntimeSpec + Spec InputRuntimeSpec `yaml:"spec,omitempty"` // Units that should be running inside this component. - Units []Unit + Units []Unit `yaml:"units"` } // ToComponents returns the components that should be running based on the policy and the current runtime specification. diff --git a/pkg/component/error.go b/pkg/component/error.go new file mode 100644 index 00000000000..25d86acaf90 --- /dev/null +++ b/pkg/component/error.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package component + +// errorReason is an error that can be marshalled/unmarshalled to and from YAML. +type errorReason struct { + reason string +} + +func newError(reason string) error { + return &errorReason{reason: reason} +} + +func (e *errorReason) Error() string { + return e.reason +} + +func (e *errorReason) MarshalYAML() (interface{}, error) { + return e.reason, nil +} + +func (e *errorReason) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal(&s) + if err != nil { + return err + } + e.reason = s + return nil +} diff --git a/pkg/component/load.go b/pkg/component/load.go index 62a983f1f9d..2b96c5ad64c 100644 --- a/pkg/component/load.go +++ b/pkg/component/load.go @@ -5,12 +5,12 @@ package component import ( + "errors" "fmt" "io/ioutil" "os" "path/filepath" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/go-ucfg/yaml" ) @@ -21,17 +21,17 @@ const ( var ( // ErrInputNotSupported is returned when the input is not supported on any platform - ErrInputNotSupported = errors.New("input not supported") + ErrInputNotSupported = newError("input not supported") // ErrInputNotSupportedOnPlatform is returned when the input is supported but not on this platform - ErrInputNotSupportedOnPlatform = errors.New("input not supported on this platform") + ErrInputNotSupportedOnPlatform = newError("input not supported on this platform") ) // InputRuntimeSpec returns the specification for running this input on the current platform. type InputRuntimeSpec struct { - InputType string - BinaryName string - BinaryPath string - Spec InputSpec + InputType string `yaml:"input_type"` + BinaryName string `yaml:"binary_name"` + BinaryPath string `yaml:"binary_path"` + Spec InputSpec `yaml:"spec"` } // RuntimeSpecs return all the specifications for inputs that are supported on the current platform.