diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 55ce2e8d7044..d1074b077742 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -88,6 +88,7 @@ - Ignore ErrNotExists when fixing permissions. {issue}27836[27836] {pull}27846[27846] - Snapshot artifact lookup will use agent.download proxy settings. {issue}27903[27903] {pull}27904[27904] - Fix lazy acker to only add new actions to the batch. {pull}27981[27981] +- Allow HTTP metrics to run in bootstrap mode. Add ability to adjust timeouts for Fleet Server. {pull}28260[28260] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/cmd/container.go b/x-pack/elastic-agent/pkg/agent/cmd/container.go index b3af7beeef67..d4f2d946217b 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/container.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/container.go @@ -388,6 +388,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string, if cfg.FleetServer.Elasticsearch.Insecure { args = append(args, "--fleet-server-es-insecure") } + if cfg.FleetServer.Timeout != 0 { + args = append(args, "--fleet-server-timeout") + args = append(args, cfg.FleetServer.Timeout.String()) + } } else { if cfg.Fleet.URL == "" { return nil, errors.New("FLEET_URL is required when FLEET_ENROLL is true without FLEET_SERVER_ENABLE") @@ -403,6 +407,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string, if token != "" { args = append(args, "--enrollment-token", token) } + if cfg.Fleet.DaemonTimeout != 0 { + args = append(args, "--daemon-timeout") + args = append(args, cfg.Fleet.DaemonTimeout.String()) + } return args, nil } @@ -539,6 +547,19 @@ func envBool(keys ...string) bool { return false } +func envTimeout(keys ...string) time.Duration { + for _, key := range keys { + val, ok := os.LookupEnv(key) + if ok { + dur, err := time.ParseDuration(val) + if err == nil { + return dur + } + } + } + return 0 +} + func envMap(key string) map[string]string { m := make(map[string]string) prefix := key + "=" diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index eed3b6b026ac..a80a9abc22bb 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -14,11 +14,10 @@ import ( "strings" "syscall" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" - "github.com/spf13/cobra" c "github.com/elastic/beats/v7/libbeat/common/cli" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" @@ -71,6 +70,8 @@ func addEnrollFlags(cmd *cobra.Command) { cmd.Flags().BoolP("proxy-disabled", "", false, "Disable proxy support including environment variables") cmd.Flags().StringSliceP("proxy-header", "", []string{}, "Proxy headers used with CONNECT request") cmd.Flags().BoolP("delay-enroll", "", false, "Delays enrollment to occur on first start of the Elastic Agent service") + cmd.Flags().DurationP("daemon-timeout", "", 0, "Timeout waiting for Elastic Agent daemon") + cmd.Flags().DurationP("fleet-server-timeout", "", 0, "Timeout waiting for Fleet Server to be ready to start enrollment") } func validateEnrollFlags(cmd *cobra.Command) error { @@ -119,6 +120,8 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string fProxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled") fProxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header") delayEnroll, _ := cmd.Flags().GetBool("delay-enroll") + daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout") + fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout") args := []string{} if url != "" { @@ -161,6 +164,14 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string args = append(args, "--fleet-server-cert-key") args = append(args, fCertKey) } + if daemonTimeout != 0 { + args = append(args, "--daemon-timeout") + args = append(args, daemonTimeout.String()) + } + if fTimeout != 0 { + args = append(args, "--fleet-server-timeout") + args = append(args, fTimeout.String()) + } for k, v := range mapFromEnvList(fHeaders) { args = append(args, "--header") @@ -287,6 +298,8 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error { proxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled") proxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header") delayEnroll, _ := cmd.Flags().GetBool("delay-enroll") + daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout") + fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout") caStr, _ := cmd.Flags().GetString("certificate-authorities") CAs := cli.StringToSlice(caStr) @@ -308,6 +321,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error { ProxyDisabled: proxyDisabled, ProxyHeaders: mapFromEnvList(proxyHeaders), DelayEnroll: delayEnroll, + DaemonTimeout: daemonTimeout, FleetServer: enrollCmdFleetServerOption{ ConnStr: fServer, ElasticsearchCA: fElasticSearchCA, @@ -321,6 +335,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error { Insecure: fInsecure, SpawnAgent: !fromInstall, Headers: mapFromEnvList(fHeaders), + Timeout: fTimeout, }, } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go index 00c99818c087..02f55d56ce83 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go @@ -85,6 +85,7 @@ type enrollCmdFleetServerOption struct { Insecure bool SpawnAgent bool Headers map[string]string + Timeout time.Duration } // enrollCmdOption define all the supported enrollment option. @@ -98,6 +99,7 @@ type enrollCmdOption struct { ProxyURL string `yaml:"proxy_url,omitempty"` ProxyDisabled bool `yaml:"proxy_disabled,omitempty"` ProxyHeaders map[string]string `yaml:"proxy_headers,omitempty"` + DaemonTimeout time.Duration `yaml:"daemon_timeout,omitempty"` UserProvidedMetadata map[string]interface{} `yaml:"-"` FixPermissions bool `yaml:"-"` DelayEnroll bool `yaml:"-"` @@ -188,7 +190,7 @@ func (c *enrollCmd) Execute(ctx context.Context, streams *cli.IOStreams) error { // Connection setup should disable proxies in that case. localFleetServer := c.options.FleetServer.ConnStr != "" if localFleetServer && !c.options.DelayEnroll { - token, err := c.fleetServerBootstrap(ctx) + token, err := c.fleetServerBootstrap(ctx, persistentConfig) if err != nil { return err } @@ -275,14 +277,14 @@ func (c *enrollCmd) writeDelayEnroll(streams *cli.IOStreams) error { return nil } -func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) { +func (c *enrollCmd) fleetServerBootstrap(ctx context.Context, persistentConfig map[string]interface{}) (string, error) { c.log.Debug("verifying communication with running Elastic Agent daemon") agentRunning := true _, err := getDaemonStatus(ctx) if err != nil { if !c.options.FleetServer.SpawnAgent { // wait longer to try and communicate with the Elastic Agent - err = waitForAgent(ctx) + err = waitForAgent(ctx, c.options.DaemonTimeout) if err != nil { return "", errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?") } @@ -296,6 +298,11 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) { return "", err } + agentConfig, err := c.createAgentConfig("", persistentConfig, c.options.FleetServer.Headers) + if err != nil { + return "", err + } + fleetConfig, err := createFleetServerBootstrapConfig( c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken, c.options.FleetServer.PolicyID, @@ -312,6 +319,7 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) { } configToStore := map[string]interface{}{ + "agent": agentConfig, "fleet": fleetConfig, } reader, err := yamlToReader(configToStore) @@ -338,9 +346,9 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) { } } - token, err := waitForFleetServer(ctx, agentSubproc, c.log) + token, err := waitForFleetServer(ctx, agentSubproc, c.log, c.options.FleetServer.Timeout) if err != nil { - return "", errors.New(err, "fleet-server never started by elastic-agent daemon", errors.TypeApplication) + return "", errors.New(err, "fleet-server failed", errors.TypeApplication) } return token, nil } @@ -617,16 +625,28 @@ type waitResult struct { err error } -func waitForAgent(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() +func waitForAgent(ctx context.Context, timeout time.Duration) error { + if timeout == 0 { + timeout = 1 * time.Minute + } + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + maxBackoff := timeout + if maxBackoff <= 0 { + // indefinite timeout + maxBackoff = 10 * time.Minute + } resChan := make(chan waitResult) innerCtx, innerCancel := context.WithCancel(context.Background()) defer innerCancel() go func() { + backOff := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff) for { - <-time.After(1 * time.Second) + backOff.Wait() _, err := getDaemonStatus(innerCtx) if err == context.Canceled { resChan <- waitResult{err: err} @@ -653,9 +673,20 @@ func waitForAgent(ctx context.Context) error { return nil } -func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger) (string, error) { - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() +func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) { + if timeout == 0 { + timeout = 2 * time.Minute + } + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + maxBackoff := timeout + if maxBackoff <= 0 { + // indefinite timeout + maxBackoff = 10 * time.Minute + } resChan := make(chan waitResult) innerCtx, innerCancel := context.WithCancel(context.Background()) @@ -663,8 +694,9 @@ func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessStat go func() { msg := "" msgCount := 0 + backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff) for { - <-time.After(1 * time.Second) + backExp.Wait() status, err := getDaemonStatus(innerCtx) if err == context.Canceled { resChan <- waitResult{err: err} @@ -950,3 +982,13 @@ func getPersistentConfig(pathConfigFile string) (map[string]interface{}, error) return persistentMap, nil } + +func expBackoffWithContext(ctx context.Context, init, max time.Duration) backoff.Backoff { + signal := make(chan struct{}) + bo := backoff.NewExpBackoff(signal, init, max) + go func() { + <-ctx.Done() + close(signal) + }() + return bo +} diff --git a/x-pack/elastic-agent/pkg/agent/cmd/setup_config.go b/x-pack/elastic-agent/pkg/agent/cmd/setup_config.go index 95057ebd4316..0c34426740ef 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/setup_config.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/setup_config.go @@ -15,14 +15,15 @@ type setupConfig struct { } type fleetConfig struct { - CA string `config:"ca"` - Enroll bool `config:"enroll"` - EnrollmentToken string `config:"enrollment_token"` - Force bool `config:"force"` - Insecure bool `config:"insecure"` - TokenName string `config:"token_name"` - TokenPolicyName string `config:"token_policy_name"` - URL string `config:"url"` + CA string `config:"ca"` + Enroll bool `config:"enroll"` + EnrollmentToken string `config:"enrollment_token"` + Force bool `config:"force"` + Insecure bool `config:"insecure"` + TokenName string `config:"token_name"` + TokenPolicyName string `config:"token_policy_name"` + URL string `config:"url"` + DaemonTimeout time.Duration `config:"daemon_timeout"` } type fleetServerConfig struct { @@ -35,6 +36,7 @@ type fleetServerConfig struct { PolicyID string `config:"policy_id"` Port string `config:"port"` Headers map[string]string `config:"headers"` + Timeout time.Duration `config:"timeout"` } type elasticsearchConfig struct { @@ -82,6 +84,7 @@ func defaultAccessConfig() (setupConfig, error) { TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"), TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"), URL: envWithDefault("", "FLEET_URL"), + DaemonTimeout: envTimeout("FLEET_DAEMON_TIMEOUT"), }, FleetServer: fleetServerConfig{ Cert: envWithDefault("", "FLEET_SERVER_CERT"), @@ -100,6 +103,7 @@ func defaultAccessConfig() (setupConfig, error) { PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"), Port: envWithDefault("", "FLEET_SERVER_PORT"), Headers: envMap("FLEET_HEADER"), + Timeout: envTimeout("FLEET_SERVER_TIMEOUT"), }, Kibana: kibanaConfig{ Fleet: kibanaFleetConfig{