Skip to content

Commit

Permalink
Allow HTTP metrics to run in bootstrap mode. Add ability to adjust ti…
Browse files Browse the repository at this point in the history
…meouts for Fleet Server. (elastic#28260)

* Allow HTTP metrics to run in bootstrap mode. Add ability to adjust timeouts for Fleet Server.

* Add changelog.

* Add the persistent agent configuration to the fleet.yml in bootstrap mode.

* Fix format issues.
  • Loading branch information
blakerouse authored and wiwen committed Nov 1, 2021
1 parent 3f09c63 commit 5f260ff
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 23 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
- Ignore ErrNotExists when fixing permissions. {issue}27836[27836] {pull}27846[27846]
- Snapshot artifact lookup will use agent.download proxy settings. {issue}27903[27903] {pull}27904[27904]
- Fix lazy acker to only add new actions to the batch. {pull}27981[27981]
- Allow HTTP metrics to run in bootstrap mode. Add ability to adjust timeouts for Fleet Server. {pull}28260[28260]

==== New features

Expand Down
21 changes: 21 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if cfg.FleetServer.Elasticsearch.Insecure {
args = append(args, "--fleet-server-es-insecure")
}
if cfg.FleetServer.Timeout != 0 {
args = append(args, "--fleet-server-timeout")
args = append(args, cfg.FleetServer.Timeout.String())
}
} else {
if cfg.Fleet.URL == "" {
return nil, errors.New("FLEET_URL is required when FLEET_ENROLL is true without FLEET_SERVER_ENABLE")
Expand All @@ -403,6 +407,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if token != "" {
args = append(args, "--enrollment-token", token)
}
if cfg.Fleet.DaemonTimeout != 0 {
args = append(args, "--daemon-timeout")
args = append(args, cfg.Fleet.DaemonTimeout.String())
}
return args, nil
}

Expand Down Expand Up @@ -540,6 +548,19 @@ func envBool(keys ...string) bool {
return false
}

func envTimeout(keys ...string) time.Duration {
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
dur, err := time.ParseDuration(val)
if err == nil {
return dur
}
}
}
return 0
}

func envMap(key string) map[string]string {
m := make(map[string]string)
prefix := key + "="
Expand Down
19 changes: 17 additions & 2 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"strings"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"

"github.com/spf13/cobra"

c "github.com/elastic/beats/v7/libbeat/common/cli"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
Expand Down Expand Up @@ -71,6 +70,8 @@ func addEnrollFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("proxy-disabled", "", false, "Disable proxy support including environment variables")
cmd.Flags().StringSliceP("proxy-header", "", []string{}, "Proxy headers used with CONNECT request")
cmd.Flags().BoolP("delay-enroll", "", false, "Delays enrollment to occur on first start of the Elastic Agent service")
cmd.Flags().DurationP("daemon-timeout", "", 0, "Timeout waiting for Elastic Agent daemon")
cmd.Flags().DurationP("fleet-server-timeout", "", 0, "Timeout waiting for Fleet Server to be ready to start enrollment")
}

func validateEnrollFlags(cmd *cobra.Command) error {
Expand Down Expand Up @@ -119,6 +120,8 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
fProxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled")
fProxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header")
delayEnroll, _ := cmd.Flags().GetBool("delay-enroll")
daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout")
fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout")

args := []string{}
if url != "" {
Expand Down Expand Up @@ -161,6 +164,14 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
args = append(args, "--fleet-server-cert-key")
args = append(args, fCertKey)
}
if daemonTimeout != 0 {
args = append(args, "--daemon-timeout")
args = append(args, daemonTimeout.String())
}
if fTimeout != 0 {
args = append(args, "--fleet-server-timeout")
args = append(args, fTimeout.String())
}

for k, v := range mapFromEnvList(fHeaders) {
args = append(args, "--header")
Expand Down Expand Up @@ -287,6 +298,8 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
proxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled")
proxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header")
delayEnroll, _ := cmd.Flags().GetBool("delay-enroll")
daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout")
fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout")

caStr, _ := cmd.Flags().GetString("certificate-authorities")
CAs := cli.StringToSlice(caStr)
Expand All @@ -308,6 +321,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
ProxyDisabled: proxyDisabled,
ProxyHeaders: mapFromEnvList(proxyHeaders),
DelayEnroll: delayEnroll,
DaemonTimeout: daemonTimeout,
FleetServer: enrollCmdFleetServerOption{
ConnStr: fServer,
ElasticsearchCA: fElasticSearchCA,
Expand All @@ -321,6 +335,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
Insecure: fInsecure,
SpawnAgent: !fromInstall,
Headers: mapFromEnvList(fHeaders),
Timeout: fTimeout,
},
}

Expand Down
68 changes: 55 additions & 13 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type enrollCmdFleetServerOption struct {
Insecure bool
SpawnAgent bool
Headers map[string]string
Timeout time.Duration
}

// enrollCmdOption define all the supported enrollment option.
Expand All @@ -98,6 +99,7 @@ type enrollCmdOption struct {
ProxyURL string `yaml:"proxy_url,omitempty"`
ProxyDisabled bool `yaml:"proxy_disabled,omitempty"`
ProxyHeaders map[string]string `yaml:"proxy_headers,omitempty"`
DaemonTimeout time.Duration `yaml:"daemon_timeout,omitempty"`
UserProvidedMetadata map[string]interface{} `yaml:"-"`
FixPermissions bool `yaml:"-"`
DelayEnroll bool `yaml:"-"`
Expand Down Expand Up @@ -188,7 +190,7 @@ func (c *enrollCmd) Execute(ctx context.Context, streams *cli.IOStreams) error {
// Connection setup should disable proxies in that case.
localFleetServer := c.options.FleetServer.ConnStr != ""
if localFleetServer && !c.options.DelayEnroll {
token, err := c.fleetServerBootstrap(ctx)
token, err := c.fleetServerBootstrap(ctx, persistentConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -275,14 +277,14 @@ func (c *enrollCmd) writeDelayEnroll(streams *cli.IOStreams) error {
return nil
}

func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
func (c *enrollCmd) fleetServerBootstrap(ctx context.Context, persistentConfig map[string]interface{}) (string, error) {
c.log.Debug("verifying communication with running Elastic Agent daemon")
agentRunning := true
_, err := getDaemonStatus(ctx)
if err != nil {
if !c.options.FleetServer.SpawnAgent {
// wait longer to try and communicate with the Elastic Agent
err = waitForAgent(ctx)
err = waitForAgent(ctx, c.options.DaemonTimeout)
if err != nil {
return "", errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?")
}
Expand All @@ -296,6 +298,11 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
return "", err
}

agentConfig, err := c.createAgentConfig("", persistentConfig, c.options.FleetServer.Headers)
if err != nil {
return "", err
}

fleetConfig, err := createFleetServerBootstrapConfig(
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
Expand All @@ -312,6 +319,7 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
}

configToStore := map[string]interface{}{
"agent": agentConfig,
"fleet": fleetConfig,
}
reader, err := yamlToReader(configToStore)
Expand All @@ -338,9 +346,9 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
}
}

token, err := waitForFleetServer(ctx, agentSubproc, c.log)
token, err := waitForFleetServer(ctx, agentSubproc, c.log, c.options.FleetServer.Timeout)
if err != nil {
return "", errors.New(err, "fleet-server never started by elastic-agent daemon", errors.TypeApplication)
return "", errors.New(err, "fleet-server failed", errors.TypeApplication)
}
return token, nil
}
Expand Down Expand Up @@ -617,16 +625,28 @@ type waitResult struct {
err error
}

func waitForAgent(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
func waitForAgent(ctx context.Context, timeout time.Duration) error {
if timeout == 0 {
timeout = 1 * time.Minute
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
maxBackoff := timeout
if maxBackoff <= 0 {
// indefinite timeout
maxBackoff = 10 * time.Minute
}

resChan := make(chan waitResult)
innerCtx, innerCancel := context.WithCancel(context.Background())
defer innerCancel()
go func() {
backOff := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)
for {
<-time.After(1 * time.Second)
backOff.Wait()
_, err := getDaemonStatus(innerCtx)
if err == context.Canceled {
resChan <- waitResult{err: err}
Expand All @@ -653,18 +673,30 @@ func waitForAgent(ctx context.Context) error {
return nil
}

func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) {
if timeout == 0 {
timeout = 2 * time.Minute
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
maxBackoff := timeout
if maxBackoff <= 0 {
// indefinite timeout
maxBackoff = 10 * time.Minute
}

resChan := make(chan waitResult)
innerCtx, innerCancel := context.WithCancel(context.Background())
defer innerCancel()
go func() {
msg := ""
msgCount := 0
backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)
for {
<-time.After(1 * time.Second)
backExp.Wait()
status, err := getDaemonStatus(innerCtx)
if err == context.Canceled {
resChan <- waitResult{err: err}
Expand Down Expand Up @@ -950,3 +982,13 @@ func getPersistentConfig(pathConfigFile string) (map[string]interface{}, error)

return persistentMap, nil
}

func expBackoffWithContext(ctx context.Context, init, max time.Duration) backoff.Backoff {
signal := make(chan struct{})
bo := backoff.NewExpBackoff(signal, init, max)
go func() {
<-ctx.Done()
close(signal)
}()
return bo
}
20 changes: 12 additions & 8 deletions x-pack/elastic-agent/pkg/agent/cmd/setup_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ type setupConfig struct {
}

type fleetConfig struct {
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
DaemonTimeout time.Duration `config:"daemon_timeout"`
}

type fleetServerConfig struct {
Expand All @@ -35,6 +36,7 @@ type fleetServerConfig struct {
PolicyID string `config:"policy_id"`
Port string `config:"port"`
Headers map[string]string `config:"headers"`
Timeout time.Duration `config:"timeout"`
}

type elasticsearchConfig struct {
Expand Down Expand Up @@ -83,6 +85,7 @@ func defaultAccessConfig() (setupConfig, error) {
TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"),
TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"),
URL: envWithDefault("", "FLEET_URL"),
DaemonTimeout: envTimeout("FLEET_DAEMON_TIMEOUT"),
},
FleetServer: fleetServerConfig{
Cert: envWithDefault("", "FLEET_SERVER_CERT"),
Expand All @@ -101,6 +104,7 @@ func defaultAccessConfig() (setupConfig, error) {
PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"),
Port: envWithDefault("", "FLEET_SERVER_PORT"),
Headers: envMap("FLEET_HEADER"),
Timeout: envTimeout("FLEET_SERVER_TIMEOUT"),
},
Kibana: kibanaConfig{
Fleet: kibanaFleetConfig{
Expand Down

0 comments on commit 5f260ff

Please sign in to comment.