Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow HTTP metrics to run in bootstrap mode. Add ability to adjust timeouts for Fleet Server. #28260

Merged
merged 5 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
- Ignore ErrNotExists when fixing permissions. {issue}27836[27836] {pull}27846[27846]
- Snapshot artifact lookup will use agent.download proxy settings. {issue}27903[27903] {pull}27904[27904]
- Fix lazy acker to only add new actions to the batch. {pull}27981[27981]
- Allow HTTP metrics to run in bootstrap mode. Add ability to adjust timeouts for Fleet Server. {pull}28260[28260]

==== New features

Expand Down
21 changes: 21 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if cfg.FleetServer.Elasticsearch.Insecure {
args = append(args, "--fleet-server-es-insecure")
}
if cfg.FleetServer.Timeout != 0 {
args = append(args, "--fleet-server-timeout")
args = append(args, cfg.FleetServer.Timeout.String())
}
} else {
if cfg.Fleet.URL == "" {
return nil, errors.New("FLEET_URL is required when FLEET_ENROLL is true without FLEET_SERVER_ENABLE")
Expand All @@ -403,6 +407,10 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if token != "" {
args = append(args, "--enrollment-token", token)
}
if cfg.Fleet.DaemonTimeout != 0 {
args = append(args, "--daemon-timeout")
args = append(args, cfg.Fleet.DaemonTimeout.String())
}
return args, nil
}

Expand Down Expand Up @@ -540,6 +548,19 @@ func envBool(keys ...string) bool {
return false
}

func envTimeout(keys ...string) time.Duration {
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
dur, err := time.ParseDuration(val)
if err == nil {
return dur
}
}
}
return 0
}

func envMap(key string) map[string]string {
m := make(map[string]string)
prefix := key + "="
Expand Down
19 changes: 17 additions & 2 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"strings"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"

"github.com/spf13/cobra"

c "github.com/elastic/beats/v7/libbeat/common/cli"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
Expand Down Expand Up @@ -71,6 +70,8 @@ func addEnrollFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("proxy-disabled", "", false, "Disable proxy support including environment variables")
cmd.Flags().StringSliceP("proxy-header", "", []string{}, "Proxy headers used with CONNECT request")
cmd.Flags().BoolP("delay-enroll", "", false, "Delays enrollment to occur on first start of the Elastic Agent service")
cmd.Flags().DurationP("daemon-timeout", "", 0, "Timeout waiting for Elastic Agent daemon")
cmd.Flags().DurationP("fleet-server-timeout", "", 0, "Timeout waiting for Fleet Server to be ready to start enrollment")
}

func validateEnrollFlags(cmd *cobra.Command) error {
Expand Down Expand Up @@ -119,6 +120,8 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
fProxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled")
fProxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header")
delayEnroll, _ := cmd.Flags().GetBool("delay-enroll")
daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout")
fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout")

args := []string{}
if url != "" {
Expand Down Expand Up @@ -161,6 +164,14 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
args = append(args, "--fleet-server-cert-key")
args = append(args, fCertKey)
}
if daemonTimeout != 0 {
args = append(args, "--daemon-timeout")
args = append(args, daemonTimeout.String())
}
if fTimeout != 0 {
args = append(args, "--fleet-server-timeout")
args = append(args, fTimeout.String())
}

for k, v := range mapFromEnvList(fHeaders) {
args = append(args, "--header")
Expand Down Expand Up @@ -287,6 +298,8 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
proxyDisabled, _ := cmd.Flags().GetBool("proxy-disabled")
proxyHeaders, _ := cmd.Flags().GetStringSlice("proxy-header")
delayEnroll, _ := cmd.Flags().GetBool("delay-enroll")
daemonTimeout, _ := cmd.Flags().GetDuration("daemon-timeout")
fTimeout, _ := cmd.Flags().GetDuration("fleet-server-timeout")

caStr, _ := cmd.Flags().GetString("certificate-authorities")
CAs := cli.StringToSlice(caStr)
Expand All @@ -308,6 +321,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
ProxyDisabled: proxyDisabled,
ProxyHeaders: mapFromEnvList(proxyHeaders),
DelayEnroll: delayEnroll,
DaemonTimeout: daemonTimeout,
FleetServer: enrollCmdFleetServerOption{
ConnStr: fServer,
ElasticsearchCA: fElasticSearchCA,
Expand All @@ -321,6 +335,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
Insecure: fInsecure,
SpawnAgent: !fromInstall,
Headers: mapFromEnvList(fHeaders),
Timeout: fTimeout,
},
}

Expand Down
68 changes: 55 additions & 13 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type enrollCmdFleetServerOption struct {
Insecure bool
SpawnAgent bool
Headers map[string]string
Timeout time.Duration
}

// enrollCmdOption define all the supported enrollment option.
Expand All @@ -98,6 +99,7 @@ type enrollCmdOption struct {
ProxyURL string `yaml:"proxy_url,omitempty"`
ProxyDisabled bool `yaml:"proxy_disabled,omitempty"`
ProxyHeaders map[string]string `yaml:"proxy_headers,omitempty"`
DaemonTimeout time.Duration `yaml:"daemon_timeout,omitempty"`
UserProvidedMetadata map[string]interface{} `yaml:"-"`
FixPermissions bool `yaml:"-"`
DelayEnroll bool `yaml:"-"`
Expand Down Expand Up @@ -188,7 +190,7 @@ func (c *enrollCmd) Execute(ctx context.Context, streams *cli.IOStreams) error {
// Connection setup should disable proxies in that case.
localFleetServer := c.options.FleetServer.ConnStr != ""
if localFleetServer && !c.options.DelayEnroll {
token, err := c.fleetServerBootstrap(ctx)
token, err := c.fleetServerBootstrap(ctx, persistentConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -275,14 +277,14 @@ func (c *enrollCmd) writeDelayEnroll(streams *cli.IOStreams) error {
return nil
}

func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
func (c *enrollCmd) fleetServerBootstrap(ctx context.Context, persistentConfig map[string]interface{}) (string, error) {
c.log.Debug("verifying communication with running Elastic Agent daemon")
agentRunning := true
_, err := getDaemonStatus(ctx)
if err != nil {
if !c.options.FleetServer.SpawnAgent {
// wait longer to try and communicate with the Elastic Agent
err = waitForAgent(ctx)
err = waitForAgent(ctx, c.options.DaemonTimeout)
if err != nil {
return "", errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?")
}
Expand All @@ -296,6 +298,11 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
return "", err
}

agentConfig, err := c.createAgentConfig("", persistentConfig, c.options.FleetServer.Headers)
if err != nil {
return "", err
}

fleetConfig, err := createFleetServerBootstrapConfig(
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
Expand All @@ -312,6 +319,7 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
}

configToStore := map[string]interface{}{
"agent": agentConfig,
"fleet": fleetConfig,
}
reader, err := yamlToReader(configToStore)
Expand All @@ -338,9 +346,9 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
}
}

token, err := waitForFleetServer(ctx, agentSubproc, c.log)
token, err := waitForFleetServer(ctx, agentSubproc, c.log, c.options.FleetServer.Timeout)
if err != nil {
return "", errors.New(err, "fleet-server never started by elastic-agent daemon", errors.TypeApplication)
return "", errors.New(err, "fleet-server failed", errors.TypeApplication)
}
return token, nil
}
Expand Down Expand Up @@ -617,16 +625,28 @@ type waitResult struct {
err error
}

func waitForAgent(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
func waitForAgent(ctx context.Context, timeout time.Duration) error {
if timeout == 0 {
timeout = 1 * time.Minute
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
maxBackoff := timeout
if maxBackoff <= 0 {
// indefinite timeout
maxBackoff = 10 * time.Minute
}

resChan := make(chan waitResult)
innerCtx, innerCancel := context.WithCancel(context.Background())
defer innerCancel()
go func() {
backOff := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)
for {
<-time.After(1 * time.Second)
backOff.Wait()
_, err := getDaemonStatus(innerCtx)
if err == context.Canceled {
resChan <- waitResult{err: err}
Expand All @@ -653,18 +673,30 @@ func waitForAgent(ctx context.Context) error {
return nil
}

func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) {
if timeout == 0 {
timeout = 2 * time.Minute
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
maxBackoff := timeout
if maxBackoff <= 0 {
// indefinite timeout
maxBackoff = 10 * time.Minute
}

resChan := make(chan waitResult)
innerCtx, innerCancel := context.WithCancel(context.Background())
defer innerCancel()
go func() {
msg := ""
msgCount := 0
backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)
for {
<-time.After(1 * time.Second)
backExp.Wait()
status, err := getDaemonStatus(innerCtx)
if err == context.Canceled {
resChan <- waitResult{err: err}
Expand Down Expand Up @@ -950,3 +982,13 @@ func getPersistentConfig(pathConfigFile string) (map[string]interface{}, error)

return persistentMap, nil
}

func expBackoffWithContext(ctx context.Context, init, max time.Duration) backoff.Backoff {
signal := make(chan struct{})
bo := backoff.NewExpBackoff(signal, init, max)
go func() {
<-ctx.Done()
close(signal)
}()
return bo
}
20 changes: 12 additions & 8 deletions x-pack/elastic-agent/pkg/agent/cmd/setup_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ type setupConfig struct {
}

type fleetConfig struct {
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
DaemonTimeout time.Duration `config:"daemon_timeout"`
}

type fleetServerConfig struct {
Expand All @@ -35,6 +36,7 @@ type fleetServerConfig struct {
PolicyID string `config:"policy_id"`
Port string `config:"port"`
Headers map[string]string `config:"headers"`
Timeout time.Duration `config:"timeout"`
}

type elasticsearchConfig struct {
Expand Down Expand Up @@ -83,6 +85,7 @@ func defaultAccessConfig() (setupConfig, error) {
TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"),
TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"),
URL: envWithDefault("", "FLEET_URL"),
DaemonTimeout: envTimeout("FLEET_DAEMON_TIMEOUT"),
},
FleetServer: fleetServerConfig{
Cert: envWithDefault("", "FLEET_SERVER_CERT"),
Expand All @@ -101,6 +104,7 @@ func defaultAccessConfig() (setupConfig, error) {
PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"),
Port: envWithDefault("", "FLEET_SERVER_PORT"),
Headers: envMap("FLEET_HEADER"),
Timeout: envTimeout("FLEET_SERVER_TIMEOUT"),
},
Kibana: kibanaConfig{
Fleet: kibanaFleetConfig{
Expand Down