Skip to content

Commit

Permalink
[Elastic Agent] Add support for Fleet Server inside Docker (#24220) (#…
Browse files Browse the repository at this point in the history
…24291)

* Add new container subcommand.

* Fix vet.

* Fix path with just enroll.

* Add changelog.

* Add FLEET_SETUP fallback. Make GET, POST to kibana for resilient.

* Add FLEET_FORCE. Don't update Kibana config when Fleet Server running locally.

(cherry picked from commit a84508c)
  • Loading branch information
blakerouse authored Mar 2, 2021
1 parent c807f79 commit 3647fef
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,10 @@

set -eo pipefail

# Environment variables used
# FLEET_CONFIG_ID - config related to new token [defaul]
# FLEET_ENROLLMENT_TOKEN - existing enrollment token to be used for enroll
# FLEET_ENROLL - if set to 1 enroll will be performed
# FLEET_ENROLL_INSECURE - if set to 1, agent will enroll with fleet using --insecure flag
# FLEET_SETUP - if set to 1 fleet setup will be performed
# FLEET_TOKEN_NAME - token name for a token to be created
# KIBANA_HOST - actual kibana host [http://localhost:5601]
# KIBANA_PASSWORD - password for accessing kibana API [changeme]
# KIBANA_USERNAME - username for accessing kibana API [elastic]
# For information on the possible environment variables that can be passed into the container. Run the following
# command for information on the options that are available.
#
# `./elastic-agent container --help`
#

function setup(){
curl -X POST ${KIBANA_HOST:-http://localhost:5601}/api/fleet/setup -H 'kbn-xsrf: true' -u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme}
curl -X POST ${KIBANA_HOST:-http://localhost:5601}/api/fleet/agents/setup \
-H 'Content-Type: application/json' \
-H 'kbn-xsrf: true' \
-u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme}
}

function enroll(){
local enrollResp
local apiKey

if [[ -n "${FLEET_ENROLLMENT_TOKEN}" ]]; then
apikey="${FLEET_ENROLLMENT_TOKEN}"
else
enrollResp=$(curl ${KIBANA_HOST:-http://localhost:5601}/api/fleet/enrollment-api-keys \
-H 'Content-Type: application/json' \
-H 'kbn-xsrf: true' \
-u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} )

local exitCode=$?
if [ $exitCode -ne 0 ]; then
exit $exitCode
fi
echo $enrollResp
local apikeyId=$(echo $enrollResp | jq -r '.list[] | select((.name | startswith("Default ")) and (.active == true)) | .id')
echo $apikeyId

if [[ -z "${apikeyId}" ]]; then
echo "Default agent policy was not found. Please consider using own enrollment token (FLEET_ENROLLMENT_TOKEN)."
exit 1
fi

enrollResp=$(curl ${KIBANA_HOST:-http://localhost:5601}/api/fleet/enrollment-api-keys/$apikeyId \
-H 'Content-Type: application/json' \
-H 'kbn-xsrf: true' \
-u ${KIBANA_USERNAME:-elastic}:${KIBANA_PASSWORD:-changeme} )

exitCode=$?
if [ $exitCode -ne 0 ]; then
exit $exitCode
fi
apikey=$(echo $enrollResp | jq -r '.item.api_key')
fi
echo $apikey

if [[ -n "${FLEET_ENROLL_INSECURE}" ]] && [[ ${FLEET_ENROLL_INSECURE} == 1 ]]; then
insecure_flag="--insecure"
fi

./{{ .BeatName }} enroll ${insecure_flag} -f --url=${KIBANA_HOST:-http://localhost:5601} --enrollment-token=$apikey
}

if [[ -n "${FLEET_SETUP}" ]] && [[ ${FLEET_SETUP} == 1 ]]; then setup; fi
if [[ -n "${FLEET_ENROLL}" ]] && [[ ${FLEET_ENROLL} == 1 ]]; then enroll; fi

exec {{ .BeatName }} run "$@"
exec {{ .BeatName }} container "$@"
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@
- Add support for Fleet Server {pull}23736[23736]
- Add support for enrollment with local bootstrap of Fleet Server {pull}23865[23865]
- Add TLS support for Fleet Server {pull}24142[24142]
- Add support for Fleet Server running under Elastic Agent {pull}24220[24220]
130 changes: 96 additions & 34 deletions x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"os"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common/backoff"
Expand Down Expand Up @@ -72,6 +74,19 @@ type EnrollCmd struct {
client clienter
configStore store
kibanaConfig *kibana.Config
agentProc *process.Info
}

// EnrollCmdFleetServerOption define all the supported enrollment options for bootstrapping with Fleet Server.
type EnrollCmdFleetServerOption struct {
ConnStr string
PolicyID string
Host string
Port uint16
Cert string
CertKey string
Insecure bool
SpawnAgent bool
}

// EnrollCmdOption define all the supported enrollment option.
Expand All @@ -84,13 +99,7 @@ type EnrollCmdOption struct {
UserProvidedMetadata map[string]interface{}
EnrollAPIKey string
Staging string
FleetServerConnStr string
FleetServerPolicyID string
FleetServerHost string
FleetServerPort uint16
FleetServerCert string
FleetServerCertKey string
FleetServerInsecure bool
FleetServer EnrollCmdFleetServerOption
}

func (e *EnrollCmdOption) kibanaConfig() (*kibana.Config, error) {
Expand Down Expand Up @@ -157,7 +166,8 @@ func NewEnrollCmdWithStore(
// Execute tries to enroll the agent into Fleet.
func (c *EnrollCmd) Execute(ctx context.Context) error {
var err error
if c.options.FleetServerConnStr != "" {
defer c.stopAgent() // ensure its stopped no matter what
if c.options.FleetServer.ConnStr != "" {
err = c.fleetServerBootstrap(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -185,18 +195,26 @@ func (c *EnrollCmd) Execute(ctx context.Context) error {
return errors.New(err, "fail to enroll")
}

if c.daemonReload(ctx) != nil {
c.log.Info("Elastic Agent might not be running; unable to trigger restart")
if c.agentProc == nil {
if c.daemonReload(ctx) != nil {
c.log.Info("Elastic Agent might not be running; unable to trigger restart")
}
c.log.Info("Successfully triggered restart on running Elastic Agent.")
return nil
}
c.log.Info("Successfully triggered restart on running Elastic Agent.")
c.log.Info("Elastic Agent has been enrolled; start Elastic Agent")
return nil
}

func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error {
c.log.Debug("verifying communication with running Elastic Agent daemon")
agentRunning := true
_, err := getDaemonStatus(ctx)
if err != nil {
return errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?")
if !c.options.FleetServer.SpawnAgent {
return errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?")
}
agentRunning = false
}

err = c.prepareFleetTLS()
Expand All @@ -205,9 +223,9 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error {
}

fleetConfig, err := createFleetServerBootstrapConfig(
c.options.FleetServerConnStr, c.options.FleetServerPolicyID,
c.options.FleetServerHost, c.options.FleetServerPort,
c.options.FleetServerCert, c.options.FleetServerCertKey)
c.options.FleetServer.ConnStr, c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey)
configToStore := map[string]interface{}{
"fleet": fleetConfig,
}
Expand All @@ -219,9 +237,18 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error {
return errors.New(err, "could not save fleet server bootstrap information", errors.TypeFilesystem)
}

err = c.daemonReload(ctx)
if err != nil {
return errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication)
if agentRunning {
// reload the already running agent
err = c.daemonReload(ctx)
if err != nil {
return errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication)
}
} else {
// spawn `run` as a subprocess so enroll can perform the bootstrap process of Fleet Server
err = c.startAgent()
if err != nil {
return err
}
}

err = waitForFleetServer(ctx, c.log)
Expand All @@ -232,25 +259,25 @@ func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error {
}

func (c *EnrollCmd) prepareFleetTLS() error {
host := c.options.FleetServerHost
host := c.options.FleetServer.Host
if host == "" {
host = "localhost"
}
port := c.options.FleetServerPort
port := c.options.FleetServer.Port
if port == 0 {
port = defaultFleetServerPort
}
if c.options.FleetServerCert != "" && c.options.FleetServerCertKey == "" {
if c.options.FleetServer.Cert != "" && c.options.FleetServer.CertKey == "" {
return errors.New("certificate private key is required when certificate provided")
}
if c.options.FleetServerCertKey != "" && c.options.FleetServerCert == "" {
if c.options.FleetServer.CertKey != "" && c.options.FleetServer.Cert == "" {
return errors.New("certificate is required when certificate private key is provided")
}
if c.options.FleetServerCert == "" && c.options.FleetServerCertKey == "" {
if c.options.FleetServerInsecure {
if c.options.FleetServer.Cert == "" && c.options.FleetServer.CertKey == "" {
if c.options.FleetServer.Insecure {
// running insecure, force the binding to localhost (unless specified)
if c.options.FleetServerHost == "" {
c.options.FleetServerHost = "localhost"
if c.options.FleetServer.Host == "" {
c.options.FleetServer.Host = "localhost"
}
c.options.URL = fmt.Sprintf("http://%s:%d", host, port)
c.options.Insecure = true
Expand All @@ -270,8 +297,8 @@ func (c *EnrollCmd) prepareFleetTLS() error {
if err != nil {
return err
}
c.options.FleetServerCert = string(pair.Crt)
c.options.FleetServerCertKey = string(pair.Key)
c.options.FleetServer.Cert = string(pair.Crt)
c.options.FleetServer.CertKey = string(pair.Key)
c.options.URL = fmt.Sprintf("https://%s:%d", hostname, port)
c.options.CAs = []string{string(ca.Crt())}
}
Expand All @@ -295,8 +322,18 @@ func (c *EnrollCmd) enrollWithBackoff(ctx context.Context) error {
signal := make(chan struct{})
backExp := backoff.NewExpBackoff(signal, 60*time.Second, 10*time.Minute)

for errors.Is(err, fleetapi.ErrTooManyRequests) {
c.log.Warn("Too many requests on the remote server, will retry in a moment.")
for {
retry := false
if errors.Is(err, fleetapi.ErrTooManyRequests) {
c.log.Warn("Too many requests on the remote server, will retry in a moment.")
retry = true
} else if errors.Is(err, fleetapi.ErrConnRefused) {
c.log.Warn("Remote server is not ready to accept connections, will retry in a moment.")
retry = true
}
if !retry {
break
}
backExp.Wait()
c.log.Info("Retrying to enroll...")
err = c.enroll(ctx)
Expand Down Expand Up @@ -344,11 +381,11 @@ func (c *EnrollCmd) enroll(ctx context.Context) error {
"sourceURI": staging,
}
}
if c.options.FleetServerConnStr != "" {
if c.options.FleetServer.ConnStr != "" {
serverConfig, err := createFleetServerBootstrapConfig(
c.options.FleetServerConnStr, c.options.FleetServerPolicyID,
c.options.FleetServerHost, c.options.FleetServerPort,
c.options.FleetServerCert, c.options.FleetServerCertKey)
c.options.FleetServer.ConnStr, c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -390,6 +427,27 @@ func (c *EnrollCmd) enroll(ctx context.Context) error {
return nil
}

func (c *EnrollCmd) startAgent() error {
cmd, err := os.Executable()
if err != nil {
return err
}
c.log.Info("Spawning Elastic Agent daemon as a subprocess to complete bootstrap process.")
proc, err := process.Start(c.log, cmd, nil, os.Geteuid(), os.Getegid(), "run")
if err != nil {
return err
}
c.agentProc = proc
return nil
}

func (c *EnrollCmd) stopAgent() {
if c.agentProc != nil {
c.agentProc.StopWait()
c.agentProc = nil
}
}

func yamlToReader(in interface{}) (io.Reader, error) {
data, err := yaml.Marshal(in)
if err != nil {
Expand Down Expand Up @@ -461,6 +519,10 @@ func waitForFleetServer(ctx context.Context, log *logger.Logger) error {
// app has started and is running
resChan <- waitResult{}
break
} else if app.Status == proto.Status_FAILED {
// app completely failed; exit now
resChan <- waitResult{err: errors.New(app.Message)}
break
}
if app.Message != "" {
appMsg := fmt.Sprintf("Fleet Server - %s", app.Message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA
}

func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) (err error) {
// do not update kibana host from policy; no setters provided with local Fleet Server
if len(h.setters) == 0 {
return nil
}

cfg, err := configuration.NewFromConfig(c)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
Expand Down
5 changes: 4 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ func newManaged(
agentInfo: agentInfo,
config: cfg,
store: store,
setters: []clientSetter{acker},
}
if cfg.Fleet.Server == nil {
// setters only set when not running a local Fleet Server
policyChanger.setters = []clientSetter{acker}
}
actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {
cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams))
cmd.AddCommand(newInspectCommandWithArgs(flags, args, streams))
cmd.AddCommand(newWatchCommandWithArgs(flags, args, streams))
cmd.AddCommand(newContainerCommand(flags, args, streams))

// windows special hidden sub-command (only added on windows)
reexec := newReExecWindowsCommand(flags, args, streams)
Expand Down
Loading

0 comments on commit 3647fef

Please sign in to comment.