From a84508c749455ef9228ba1024580279e4cc86ab7 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 1 Mar 2021 17:20:06 -0500 Subject: [PATCH] [Elastic Agent] Add support for Fleet Server inside Docker (#24220) * Add new container subcommand. * Fix vet. * Fix path with just enroll. * Add changelog. * Add FLEET_SETUP fallback. Make GET, POST to kibana for resilient. * Add FLEET_FORCE. Don't update Kibana config when Fleet Server running locally. --- .../docker-entrypoint.elastic-agent.tmpl | 73 +-- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/enroll_cmd.go | 130 ++++-- .../handler_action_policy_change.go | 5 + .../pkg/agent/application/managed_mode.go | 5 +- x-pack/elastic-agent/pkg/agent/cmd/common.go | 1 + .../elastic-agent/pkg/agent/cmd/container.go | 427 ++++++++++++++++++ x-pack/elastic-agent/pkg/agent/cmd/enroll.go | 17 +- x-pack/elastic-agent/pkg/core/process/cmd.go | 4 + .../pkg/core/process/cmd_darwin.go | 4 + .../pkg/core/process/cmd_linux.go | 4 + .../elastic-agent/pkg/core/process/process.go | 15 + .../elastic-agent/pkg/fleetapi/enroll_cmd.go | 14 + 13 files changed, 591 insertions(+), 109 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/agent/cmd/container.go diff --git a/dev-tools/packaging/templates/docker/docker-entrypoint.elastic-agent.tmpl b/dev-tools/packaging/templates/docker/docker-entrypoint.elastic-agent.tmpl index f1e6febfe8a8..864270dc1fef 100644 --- a/dev-tools/packaging/templates/docker/docker-entrypoint.elastic-agent.tmpl +++ b/dev-tools/packaging/templates/docker/docker-entrypoint.elastic-agent.tmpl @@ -2,71 +2,10 @@ set -eo pipefail -# Environment variables used -# FLEET_ENROLLMENT_TOKEN - existing enrollment token to be used for enroll -# FLEET_ENROLL - if set to 1 enroll will be performed -# FLEET_ENROLL_INSECURE - if set to 1, agent will enroll with fleet using --insecure flag -# FLEET_SETUP - if set to 1 fleet setup will be performed -# FLEET_TOKEN_NAME - token name for a token to be created -# KIBANA_HOST - actual kibana host [http://localhost:5601] -# KIBANA_PASSWORD - password for accessing kibana API [changeme] -# KIBANA_USERNAME - username for accessing kibana API [elastic] +# For information on the possible environment variables that can be passed into the container. Run the following +# command for information on the options that are available. +# +# `./elastic-agent container --help` +# -function setup(){ - curl -X POST ${KIBANA_HOST:-http://localhost:5601}/api/fleet/setup -H 'kbn-xsrf: true' -u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} - curl -X POST ${KIBANA_HOST:-http://localhost:5601}/api/fleet/agents/setup \ - -H 'Content-Type: application/json' \ - -H 'kbn-xsrf: true' \ - -u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} -} - -function enroll(){ - local enrollResp - local apiKey - - if [[ -n "${FLEET_ENROLLMENT_TOKEN}" ]]; then - apikey="${FLEET_ENROLLMENT_TOKEN}" - else - enrollResp=$(curl ${KIBANA_HOST:-http://localhost:5601}/api/fleet/enrollment-api-keys \ - -H 'Content-Type: application/json' \ - -H 'kbn-xsrf: true' \ - -u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} ) - - local exitCode=$? - if [ $exitCode -ne 0 ]; then - exit $exitCode - fi - echo $enrollResp - local apikeyId=$(echo $enrollResp | jq -r '.list[] | select((.name | startswith("Default ")) and (.active == true)) | .id') - echo $apikeyId - - if [[ -z "${apikeyId}" ]]; then - echo "Default agent policy was not found. Please consider using own enrollment token (FLEET_ENROLLMENT_TOKEN)." - exit 1 - fi - - enrollResp=$(curl ${KIBANA_HOST:-http://localhost:5601}/api/fleet/enrollment-api-keys/$apikeyId \ - -H 'Content-Type: application/json' \ - -H 'kbn-xsrf: true' \ - -u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} ) - - exitCode=$? - if [ $exitCode -ne 0 ]; then - exit $exitCode - fi - - apikey=$(echo $enrollResp | jq -r '.item.api_key') - fi - echo $apikey - - if [[ -n "${FLEET_ENROLL_INSECURE}" ]] && [[ ${FLEET_ENROLL_INSECURE} == 1 ]]; then - insecure_flag="--insecure" - fi - - ./{{ .BeatName }} enroll ${insecure_flag} -f --url=${KIBANA_HOST:-http://localhost:5601} --enrollment-token=$apikey -} - -if [[ -n "${FLEET_SETUP}" ]] && [[ ${FLEET_SETUP} == 1 ]]; then setup; fi -if [[ -n "${FLEET_ENROLL}" ]] && [[ ${FLEET_ENROLL} == 1 ]]; then enroll; fi - -exec {{ .BeatName }} run "$@" +exec {{ .BeatName }} container "$@" diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 06b7cbf655d4..e1804b614c98 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -73,3 +73,4 @@ - Add support for Fleet Server {pull}23736[23736] - Add support for enrollment with local bootstrap of Fleet Server {pull}23865[23865] - Add TLS support for Fleet Server {pull}24142[24142] +- Add support for Fleet Server running under Elastic Agent {pull}24220[24220] diff --git a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go index ca77dc0140e5..65cce3193103 100644 --- a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go @@ -15,6 +15,8 @@ import ( "os" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" + "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/common/backoff" @@ -72,6 +74,19 @@ type EnrollCmd struct { client clienter configStore store kibanaConfig *kibana.Config + agentProc *process.Info +} + +// EnrollCmdFleetServerOption define all the supported enrollment options for bootstrapping with Fleet Server. +type EnrollCmdFleetServerOption struct { + ConnStr string + PolicyID string + Host string + Port uint16 + Cert string + CertKey string + Insecure bool + SpawnAgent bool } // EnrollCmdOption define all the supported enrollment option. @@ -84,13 +99,7 @@ type EnrollCmdOption struct { UserProvidedMetadata map[string]interface{} EnrollAPIKey string Staging string - FleetServerConnStr string - FleetServerPolicyID string - FleetServerHost string - FleetServerPort uint16 - FleetServerCert string - FleetServerCertKey string - FleetServerInsecure bool + FleetServer EnrollCmdFleetServerOption } func (e *EnrollCmdOption) kibanaConfig() (*kibana.Config, error) { @@ -157,7 +166,8 @@ func NewEnrollCmdWithStore( // Execute tries to enroll the agent into Fleet. func (c *EnrollCmd) Execute(ctx context.Context) error { var err error - if c.options.FleetServerConnStr != "" { + defer c.stopAgent() // ensure its stopped no matter what + if c.options.FleetServer.ConnStr != "" { err = c.fleetServerBootstrap(ctx) if err != nil { return err @@ -185,18 +195,26 @@ func (c *EnrollCmd) Execute(ctx context.Context) error { return errors.New(err, "fail to enroll") } - if c.daemonReload(ctx) != nil { - c.log.Info("Elastic Agent might not be running; unable to trigger restart") + if c.agentProc == nil { + if c.daemonReload(ctx) != nil { + c.log.Info("Elastic Agent might not be running; unable to trigger restart") + } + c.log.Info("Successfully triggered restart on running Elastic Agent.") + return nil } - c.log.Info("Successfully triggered restart on running Elastic Agent.") + c.log.Info("Elastic Agent has been enrolled; start Elastic Agent") return nil } func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error { c.log.Debug("verifying communication with running Elastic Agent daemon") + agentRunning := true _, err := getDaemonStatus(ctx) if err != nil { - return errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?") + if !c.options.FleetServer.SpawnAgent { + return errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?") + } + agentRunning = false } err = c.prepareFleetTLS() @@ -205,9 +223,9 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error { } fleetConfig, err := createFleetServerBootstrapConfig( - c.options.FleetServerConnStr, c.options.FleetServerPolicyID, - c.options.FleetServerHost, c.options.FleetServerPort, - c.options.FleetServerCert, c.options.FleetServerCertKey) + c.options.FleetServer.ConnStr, c.options.FleetServer.PolicyID, + c.options.FleetServer.Host, c.options.FleetServer.Port, + c.options.FleetServer.Cert, c.options.FleetServer.CertKey) configToStore := map[string]interface{}{ "fleet": fleetConfig, } @@ -219,9 +237,18 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error { return errors.New(err, "could not save fleet server bootstrap information", errors.TypeFilesystem) } - err = c.daemonReload(ctx) - if err != nil { - return errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication) + if agentRunning { + // reload the already running agent + err = c.daemonReload(ctx) + if err != nil { + return errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication) + } + } else { + // spawn `run` as a subprocess so enroll can perform the bootstrap process of Fleet Server + err = c.startAgent() + if err != nil { + return err + } } err = waitForFleetServer(ctx, c.log) @@ -232,25 +259,25 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error { } func (c *EnrollCmd) prepareFleetTLS() error { - host := c.options.FleetServerHost + host := c.options.FleetServer.Host if host == "" { host = "localhost" } - port := c.options.FleetServerPort + port := c.options.FleetServer.Port if port == 0 { port = defaultFleetServerPort } - if c.options.FleetServerCert != "" && c.options.FleetServerCertKey == "" { + if c.options.FleetServer.Cert != "" && c.options.FleetServer.CertKey == "" { return errors.New("certificate private key is required when certificate provided") } - if c.options.FleetServerCertKey != "" && c.options.FleetServerCert == "" { + if c.options.FleetServer.CertKey != "" && c.options.FleetServer.Cert == "" { return errors.New("certificate is required when certificate private key is provided") } - if c.options.FleetServerCert == "" && c.options.FleetServerCertKey == "" { - if c.options.FleetServerInsecure { + if c.options.FleetServer.Cert == "" && c.options.FleetServer.CertKey == "" { + if c.options.FleetServer.Insecure { // running insecure, force the binding to localhost (unless specified) - if c.options.FleetServerHost == "" { - c.options.FleetServerHost = "localhost" + if c.options.FleetServer.Host == "" { + c.options.FleetServer.Host = "localhost" } c.options.URL = fmt.Sprintf("http://%s:%d", host, port) c.options.Insecure = true @@ -270,8 +297,8 @@ func (c *EnrollCmd) prepareFleetTLS() error { if err != nil { return err } - c.options.FleetServerCert = string(pair.Crt) - c.options.FleetServerCertKey = string(pair.Key) + c.options.FleetServer.Cert = string(pair.Crt) + c.options.FleetServer.CertKey = string(pair.Key) c.options.URL = fmt.Sprintf("https://%s:%d", hostname, port) c.options.CAs = []string{string(ca.Crt())} } @@ -295,8 +322,18 @@ func (c *EnrollCmd) enrollWithBackoff(ctx context.Context) error { signal := make(chan struct{}) backExp := backoff.NewExpBackoff(signal, 60*time.Second, 10*time.Minute) - for errors.Is(err, fleetapi.ErrTooManyRequests) { - c.log.Warn("Too many requests on the remote server, will retry in a moment.") + for { + retry := false + if errors.Is(err, fleetapi.ErrTooManyRequests) { + c.log.Warn("Too many requests on the remote server, will retry in a moment.") + retry = true + } else if errors.Is(err, fleetapi.ErrConnRefused) { + c.log.Warn("Remote server is not ready to accept connections, will retry in a moment.") + retry = true + } + if !retry { + break + } backExp.Wait() c.log.Info("Retrying to enroll...") err = c.enroll(ctx) @@ -344,11 +381,11 @@ func (c *EnrollCmd) enroll(ctx context.Context) error { "sourceURI": staging, } } - if c.options.FleetServerConnStr != "" { + if c.options.FleetServer.ConnStr != "" { serverConfig, err := createFleetServerBootstrapConfig( - c.options.FleetServerConnStr, c.options.FleetServerPolicyID, - c.options.FleetServerHost, c.options.FleetServerPort, - c.options.FleetServerCert, c.options.FleetServerCertKey) + c.options.FleetServer.ConnStr, c.options.FleetServer.PolicyID, + c.options.FleetServer.Host, c.options.FleetServer.Port, + c.options.FleetServer.Cert, c.options.FleetServer.CertKey) if err != nil { return err } @@ -390,6 +427,27 @@ func (c *EnrollCmd) enroll(ctx context.Context) error { return nil } +func (c *EnrollCmd) startAgent() error { + cmd, err := os.Executable() + if err != nil { + return err + } + c.log.Info("Spawning Elastic Agent daemon as a subprocess to complete bootstrap process.") + proc, err := process.Start(c.log, cmd, nil, os.Geteuid(), os.Getegid(), "run") + if err != nil { + return err + } + c.agentProc = proc + return nil +} + +func (c *EnrollCmd) stopAgent() { + if c.agentProc != nil { + c.agentProc.StopWait() + c.agentProc = nil + } +} + func yamlToReader(in interface{}) (io.Reader, error) { data, err := yaml.Marshal(in) if err != nil { @@ -461,6 +519,10 @@ func waitForFleetServer(ctx context.Context, log *logger.Logger) error { // app has started and is running resChan <- waitResult{} break + } else if app.Status == proto.Status_FAILED { + // app completely failed; exit now + resChan <- waitResult{err: errors.New(app.Message)} + break } if app.Message != "" { appMsg := fmt.Sprintf("Fleet Server - %s", app.Message) diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go index b921569a19b4..bb9297998e7b 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go @@ -62,6 +62,11 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA } func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) (err error) { + // do not update kibana host from policy; no setters provided with local Fleet Server + if len(h.setters) == 0 { + return nil + } + cfg, err := configuration.NewFromConfig(c) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index c12c2451e3a8..242b556594c2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -181,7 +181,10 @@ func newManaged( agentInfo: agentInfo, config: cfg, store: store, - setters: []clientSetter{acker}, + } + if cfg.Fleet.Server == nil { + // setters only set when not running a local Fleet Server + policyChanger.setters = []clientSetter{acker} } actionDispatcher.MustRegister( &fleetapi.ActionPolicyChange{}, diff --git a/x-pack/elastic-agent/pkg/agent/cmd/common.go b/x-pack/elastic-agent/pkg/agent/cmd/common.go index a25b43f13942..60f01cadf151 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/common.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/common.go @@ -73,6 +73,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command { cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams)) cmd.AddCommand(newInspectCommandWithArgs(flags, args, streams)) cmd.AddCommand(newWatchCommandWithArgs(flags, args, streams)) + cmd.AddCommand(newContainerCommand(flags, args, streams)) // windows special hidden sub-command (only added on windows) reexec := newReExecWindowsCommand(flags, args, streams) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/container.go b/x-pack/elastic-agent/pkg/agent/cmd/container.go new file mode 100644 index 000000000000..b35442df504a --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/cmd/container.go @@ -0,0 +1,427 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/url" + "os" + "os/exec" + "strings" + "time" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/v7/libbeat/kibana" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" +) + +const ( + defaultKibanaHost = "http://kibana:5601" + defaultESHost = "http://elasticsearch:9200" + defaultUsername = "elastic" + defaultPassword = "changeme" + defaultTokenName = "Default" + + requestRetrySleep = 1 * time.Second // sleep 1 sec between retries for HTTP requests + maxRequestRetries = 30 // maximum number of retries for HTTP requests +) + +func newContainerCommand(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command { + return &cobra.Command{ + Hidden: true, // not exposed over help; used by container entrypoint only + Use: "container", + Short: "Bootstrap Elastic Agent to run inside of a container", + Long: `This should only be used as an entrypoint for a container. This will prepare the Elastic Agent using +environment variables to run inside of the container. + +The following actions are possible and grouped based on the actions. + +* Preparing Kibana for Fleet + This prepares the Fleet plugin that exists inside of Kibana. This must either be enabled here or done externally + before Fleet Server will actually successfully start. + + KIBANA_FLEET_SETUP - set to 1 enables this setup + KIBANA_FLEET_HOST - kibana host to enable Fleet on [$KIBANA_HOST] + KIBANA_FLEET_USERNAME - kibana username to enable Fleet [$KIBANA_USERNAME] + KIBANA_FLEET_PASSWORD - kibana password to enable Fleet [$KIBANA_PASSWORD] + +* Bootstrapping Fleet Server + This bootstraps the Fleet Server to be run by this Elastic Agent. At least one Fleet Server is required in a Fleet + deployment for other Elastic Agent to bootstrap. + + FLEET_SERVER_ENABLE - set to 1 enables bootstrapping of Fleet Server (forces FLEET_ENROLL enabled) + FLEET_SERVER_ELASTICSEARCH_HOST - elasticsearch host for Fleet Server to communicate with [$ELASTICSEARCH_HOST] + FLEET_SERVER_ELASTICSEARCH_USERNAME - elasticsearch username for Fleet Server [$ELASTICSEARCH_USERNAME] + FLEET_SERVER_ELASTICSEARCH_PASSWORD - elasticsearch password for Fleet Server [$ELASTICSEARCH_PASSWORD] + FLEET_SERVER_POLICY_NAME - name of policy for the Fleet Server to use for itself [$FLEET_TOKEN_POLICY_NAME] + FLEET_SERVER_POLICY_ID - policy ID for Fleet Server to use for itself ("Default Fleet Server policy" used when undefined) + FLEET_SERVER_HOST - binding host for Fleet Server HTTP (overrides the policy) + FLEET_SERVER_PORT - binding port for Fleet Server HTTP (overrides the policy) + FLEET_SERVER_CERT - path to certificate to use for HTTPS endpoint + FLEET_SERVER_CERT_KEY - path to private key for certificate to use for HTTPS endpoint + FLEET_SERVER_INSECURE_HTTP - expose Fleet Server over HTTP (not recommended; insecure) + +* Elastic Agent Fleet Enrollment + This enrolls the Elastic Agent into a Fleet Server. It is also possible to have this create a new enrollment token + for this specific Elastic Agent. + + FLEET_ENROLL - set to 1 for enrollment to occur + FLEET_URL - URL of the Fleet Server to enroll into + FLEET_ENROLLMENT_TOKEN - token to use for enrollment + FLEET_TOKEN_NAME - token name to use for fetching token from Kibana + FLEET_TOKEN_POLICY_NAME - token policy name to use for fetching token from Kibana + FLEET_INSECURE - communicate with Fleet with either insecure HTTP or un-verified HTTPS + KIBANA_FLEET_HOST - kibana host to enable create enrollment token on [$KIBANA_HOST] + KIBANA_FLEET_USERNAME - kibana username to create enrollment token [$KIBANA_USERNAME] + KIBANA_FLEET_PASSWORD - kibana password to create enrollment token [$KIBANA_PASSWORD] + +The following environment variables are provided as a convenience to prevent a large number of environment variable to +be used when the same credentials will be used across all the possible actions above. + + ELASTICSEARCH_HOST - elasticsearch host [http://elasticsearch:9200] + ELASTICSEARCH_USERNAME - elasticsearch username [elastic] + ELASTICSEARCH_PASSWORD - elasticsearch password [changeme] + KIBANA_HOST - kibana host [http://kibana:5601] + KIBANA_USERNAME - kibana username [$ELASTICSEARCH_USERNAME] + KIBANA_PASSWORD - kibana password [$ELASTICSEARCH_PASSWORD] + +By default when this command starts it will check for an existing fleet.yml. If that file already exists then +all the above actions will be skipped, because the Elastic Agent has already been enrolled. To ensure that enrollment +occurs on every start of the container set FLEET_FORCE to 1. +`, + Run: func(c *cobra.Command, args []string) { + if err := containerCmd(streams, c, flags, args); err != nil { + fmt.Fprintf(streams.Err, "Error: %v\n", err) + os.Exit(1) + } + }, + } +} + +func containerCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error { + var err error + var client *kibana.Client + executable, err := os.Executable() + if err != nil { + return err + } + + _, err = os.Stat(info.AgentConfigFile()) + if !os.IsNotExist(err) && !envBool("FLEET_FORCE") { + // already enrolled, just run the standard run + return run(flags, streams) + } + + // Remove FLEET_SETUP in 8.x + // The FLEET_SETUP environment variable boolean is a fallback to the old name. The name was updated to + // reflect that its setting up Fleet in Kibana versus setting up Fleet Server. + if envBool("KIBANA_FLEET_SETUP", "FLEET_SETUP") { + client, err = kibanaClient() + if err != nil { + return err + } + fmt.Fprintf(streams.Out, "Performing setup of Fleet in Kibana\n") + err = kibanaSetup(client, streams) + if err != nil { + return err + } + } + if envBool("FLEET_ENROLL", "FLEET_SERVER_ENABLE") { + if client == nil { + client, err = kibanaClient() + if err != nil { + return err + } + } + var policy *kibanaPolicy + token := envWithDefault("", "FLEET_ENROLLMENT_TOKEN") + if token == "" { + policy, err = kibanaFetchPolicy(client, streams) + if err != nil { + return err + } + token, err = kibanaFetchToken(client, policy, streams) + if err != nil { + return err + } + } + policyID := "" + if policy != nil { + policyID = policy.ID + } + cmdArgs, err := buildEnrollArgs(token, policyID) + if err != nil { + return err + } + enroll := exec.Command(executable, cmdArgs...) + enroll.Stdout = os.Stdout + enroll.Stderr = os.Stderr + err = enroll.Start() + if err != nil { + return errors.New("failed to execute enrollment command", err) + } + err = enroll.Wait() + if err != nil { + return errors.New("enrollment failed", err) + } + } + + return run(flags, streams) +} + +func buildEnrollArgs(token string, policyID string) ([]string, error) { + args := []string{"enroll", "-f"} + if envBool("FLEET_SERVER_ENABLE") { + connStr, err := buildFleetServerConnStr() + if err != nil { + return nil, err + } + args = append(args, "--fleet-server", connStr) + if policyID == "" { + policyID = envWithDefault("", "FLEET_SERVER_POLICY_ID") + } + if policyID != "" { + args = append(args, "--fleet-server-policy", policyID) + } + host := envWithDefault("", "FLEET_SERVER_HOST") + if host != "" { + args = append(args, "--fleet-server-host", host) + } + port := envWithDefault("", "FLEET_SERVER_PORT") + if port != "" { + args = append(args, "--fleet-server-port", port) + } + cert := envWithDefault("", "FLEET_SERVER_CERT") + if cert != "" { + args = append(args, "--fleet-server-cert", cert) + } + certKey := envWithDefault("", "FLEET_SERVER_CERT_KEY") + if certKey != "" { + args = append(args, "--fleet-server-cert-key", certKey) + } + if envBool("FLEET_SERVER_INSECURE_HTTP") { + args = append(args, "--fleet-server--insecure-http") + args = append(args, "--insecure") + } + } else { + url := envWithDefault("", "FLEET_URL") + if url == "" { + return nil, errors.New("FLEET_URL is required when FLEET_ENROLL is true without FLEET_SERVER_ENABLE") + } + args = append(args, "--url", url) + if envBool("FLEET_INSECURE") { + args = append(args, "--insecure") + } + } + args = append(args, "--enrollment-token", token) + return args, nil +} + +func buildFleetServerConnStr() (string, error) { + host := envWithDefault(defaultESHost, "FLEET_SERVER_ELASTICSEARCH_HOST", "ELASTICSEARCH_HOST") + username := envWithDefault(defaultUsername, "FLEET_SERVER_ELASTICSEARCH_USERNAME", "$ELASTICSEARCH_USERNAME") + password := envWithDefault(defaultPassword, "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "$ELASTICSEARCH_PASSWORD") + u, err := url.Parse(host) + if err != nil { + return "", err + } + path := "" + if u.Path != "" { + path += "/" + strings.TrimLeft(u.Path, "/") + } + return fmt.Sprintf("%s://%s:%s@%s%s", u.Scheme, username, password, u.Host, path), nil +} + +func kibanaSetup(client *kibana.Client, streams *cli.IOStreams) error { + err := performPOST(client, "/api/fleet/setup", streams.Err, "Kibana Fleet setup") + if err != nil { + return err + } + err = performPOST(client, "/api/fleet/agents/setup", streams.Err, "Kibana Fleet Agents setup") + if err != nil { + return err + } + return nil +} + +func kibanaFetchPolicy(client *kibana.Client, streams *cli.IOStreams) (*kibanaPolicy, error) { + var policies kibanaPolicies + err := performGET(client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy") + if err != nil { + return nil, err + } + return findPolicy(policies.Items) +} + +func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams) (string, error) { + var keys kibanaAPIKeys + err := performGET(client, "/api/fleet/enrollment-api-keys", &keys, streams.Err, "Kibana fetch token") + if err != nil { + return "", err + } + key, err := findKey(keys.List, policy) + if err != nil { + return "", err + } + return key.APIKey, nil +} + +func kibanaClient() (*kibana.Client, error) { + host := envWithDefault(defaultKibanaHost, "KIBANA_FLEET_HOST", "KIBANA_HOST") + username := envWithDefault(defaultUsername, "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "$ELASTICSEARCH_USERNAME") + password := envWithDefault(defaultPassword, "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "$ELASTICSEARCH_PASSWORD") + return kibana.NewClientWithConfig(&kibana.ClientConfig{ + Host: host, + Username: username, + Password: password, + IgnoreVersion: true, + }) +} + +func findPolicy(policies []kibanaPolicy) (*kibanaPolicy, error) { + fleetServerEnabled := envBool("FLEET_SERVER_ENABLE") + policyName := envWithDefault("", "FLEET_TOKEN_POLICY_NAME") + if fleetServerEnabled { + policyName = envWithDefault("", "FLEET_SERVER_POLICY_NAME", "FLEET_TOKEN_POLICY_NAME") + } + for _, policy := range policies { + if policy.Status != "active" { + continue + } + if policyName != "" { + if policyName == policy.Name { + return &policy, nil + } + } else if fleetServerEnabled { + if policy.IsDefaultFleetServer { + return &policy, nil + } + } else { + if policy.IsDefault { + return &policy, nil + } + } + } + return nil, fmt.Errorf(`unable to find policy named "%s"`, policyName) +} + +func findKey(keys []kibanaAPIKey, policy *kibanaPolicy) (*kibanaAPIKey, error) { + tokenName := envWithDefault(defaultTokenName, "FLEET_TOKEN_NAME") + for _, key := range keys { + name := strings.TrimSpace(strings.Replace(key.Name, fmt.Sprintf(" (%s)", key.ID), "", 1)) + if name == tokenName && key.PolicyID == policy.ID { + return &key, nil + } + } + return nil, fmt.Errorf(`unable to find enrollment token named "%s" in policy "%s"`, tokenName, policy.Name) +} + +func envWithDefault(def string, keys ...string) string { + for _, key := range keys { + val, ok := os.LookupEnv(key) + if ok { + return val + } + } + return def +} + +func envBool(keys ...string) bool { + for _, key := range keys { + val, ok := os.LookupEnv(key) + if ok && isTrue(val) { + return true + } + } + return false +} + +func isTrue(val string) bool { + trueVals := []string{"1", "true", "yes", "y"} + val = strings.ToLower(val) + for _, v := range trueVals { + if val == v { + return true + } + } + return false +} + +func performGET(client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error { + var lastErr error + for i := 0; i < maxRequestRetries; i++ { + code, result, err := client.Connection.Request("GET", path, nil, nil, nil) + if err != nil || code != 200 { + err = fmt.Errorf("http GET request to %s%s fails: %v. Response: %s", + client.Connection.URL, path, err, truncateString(result)) + fmt.Fprintf(writer, "%s failed: %s\n", msg, err) + <-time.After(requestRetrySleep) + continue + } + if response == nil { + return nil + } + return json.Unmarshal(result, response) + } + return lastErr +} + +func performPOST(client *kibana.Client, path string, writer io.Writer, msg string) error { + var lastErr error + for i := 0; i < maxRequestRetries; i++ { + code, result, err := client.Connection.Request("POST", path, nil, nil, nil) + if err != nil || code >= 400 { + err = fmt.Errorf("http POST request to %s%s fails: %v. Response: %s", + client.Connection.URL, path, err, truncateString(result)) + lastErr = err + fmt.Fprintf(writer, "%s failed: %s\n", msg, err) + <-time.After(requestRetrySleep) + continue + } + return nil + } + return lastErr +} + +func truncateString(b []byte) string { + const maxLength = 250 + runes := bytes.Runes(b) + if len(runes) > maxLength { + runes = append(runes[:maxLength], []rune("... (truncated)")...) + } + + return strings.Replace(string(runes), "\n", " ", -1) +} + +type kibanaPolicy struct { + ID string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + IsDefault bool `json:"is_default"` + IsDefaultFleetServer bool `json:"is_default_fleet_server"` +} + +type kibanaPolicies struct { + Items []kibanaPolicy `json:"items"` +} + +type kibanaAPIKey struct { + ID string `json:"id"` + Name string `json:"name"` + Active bool `json:"active"` + PolicyID string `json:"policy_id"` + APIKey string `json:"api_key"` +} + +type kibanaAPIKeys struct { + List []kibanaAPIKey `json:"list"` +} diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 32201329fc10..2a000684e2d5 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -222,13 +222,16 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args Insecure: insecure, UserProvidedMetadata: make(map[string]interface{}), Staging: staging, - FleetServerConnStr: fServer, - FleetServerPolicyID: fPolicy, - FleetServerHost: fHost, - FleetServerPort: fPort, - FleetServerCert: fCert, - FleetServerCertKey: fCertKey, - FleetServerInsecure: fInsecure, + FleetServer: application.EnrollCmdFleetServerOption{ + ConnStr: fServer, + PolicyID: fPolicy, + Host: fHost, + Port: fPort, + Cert: fCert, + CertKey: fCertKey, + Insecure: fInsecure, + SpawnAgent: !fromInstall, + }, } c, err := application.NewEnrollCmd( diff --git a/x-pack/elastic-agent/pkg/core/process/cmd.go b/x-pack/elastic-agent/pkg/core/process/cmd.go index d1541a05e180..3e4bc5da7d1a 100644 --- a/x-pack/elastic-agent/pkg/core/process/cmd.go +++ b/x-pack/elastic-agent/pkg/core/process/cmd.go @@ -23,3 +23,7 @@ func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg return cmd } + +func terminateCmd(proc *os.Process) error { + return proc.Kill() +} diff --git a/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go b/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go index 0d1921941573..6f17b61c800d 100644 --- a/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go +++ b/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go @@ -39,3 +39,7 @@ func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg func isInt32(val int) bool { return val >= 0 && val <= math.MaxInt32 } + +func terminateCmd(proc *os.Process) error { + return proc.Signal(syscall.SIGTERM) +} diff --git a/x-pack/elastic-agent/pkg/core/process/cmd_linux.go b/x-pack/elastic-agent/pkg/core/process/cmd_linux.go index 3c28ab54c4be..d580dc80beda 100644 --- a/x-pack/elastic-agent/pkg/core/process/cmd_linux.go +++ b/x-pack/elastic-agent/pkg/core/process/cmd_linux.go @@ -42,3 +42,7 @@ func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg func isInt32(val int) bool { return val >= 0 && val <= math.MaxInt32 } + +func terminateCmd(proc *os.Process) error { + return proc.Signal(syscall.SIGTERM) +} diff --git a/x-pack/elastic-agent/pkg/core/process/process.go b/x-pack/elastic-agent/pkg/core/process/process.go index 81f1960bf8bb..9dcb86ae620e 100644 --- a/x-pack/elastic-agent/pkg/core/process/process.go +++ b/x-pack/elastic-agent/pkg/core/process/process.go @@ -48,3 +48,18 @@ func Start(logger *logger.Logger, path string, config *Config, uid, gid int, arg Stdin: stdin, }, err } + +// Stop stops the process cleanly. +func (i *Info) Stop() error { + return terminateCmd(i.Process) +} + +// StopWait stops the process and waits for it to exit. +func (i *Info) StopWait() error { + err := i.Stop() + if err != nil { + return err + } + _, err = i.Process.Wait() + return err +} diff --git a/x-pack/elastic-agent/pkg/fleetapi/enroll_cmd.go b/x-pack/elastic-agent/pkg/fleetapi/enroll_cmd.go index 29168be0ae4b..fc3949799a2e 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/enroll_cmd.go +++ b/x-pack/elastic-agent/pkg/fleetapi/enroll_cmd.go @@ -9,7 +9,9 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" + "net/url" "time" "github.com/hashicorp/go-multierror" @@ -24,6 +26,9 @@ type EnrollType string // ErrTooManyRequests is received when the remote server is overloaded. var ErrTooManyRequests = errors.New("too many requests received (429)") +// ErrConnRefused is returned when the connection to the server is refused. +var ErrConnRefused = errors.New("connection refused") + const ( // PermanentEnroll is default enrollment type, by default an Agent is permanently enroll to Agent. PermanentEnroll = EnrollType("PERMANENT") @@ -187,6 +192,15 @@ func (e *EnrollCmd) Execute(ctx context.Context, r *EnrollRequest) (*EnrollRespo resp, err := e.client.Send(ctx, "POST", p, nil, headers, bytes.NewBuffer(b)) if err != nil { + // connection refused is returned as a clean type + switch et := err.(type) { + case *url.Error: + err = et.Err + } + switch err.(type) { + case *net.OpError: + return nil, ErrConnRefused + } return nil, err } defer resp.Body.Close()