Skip to content

Commit

Permalink
Switch the GRPC communication where Agent is running the server and t…
Browse files Browse the repository at this point in the history
…he beats are connecting back to Agent (#18973)

* Update libbeat fleet management to use elastic-agent-client.

* Work on switching to the new GRPC.

* More on refactor

* Add back the state.

* Format and check

* Add changelog.

* Cleanup.

* mage fmt in x-pack/libbeat.

* Update go.mod.

* Fix from review.

* Fix NewFromConfig comment

* Update go.mod

* Fix imports.

* Fix some locking issues from review.

* Fix lots of issues, add unit testing to cover restarts on failure reporting or crashes.

* Update go.sum

* Fix TestConfigurableRun.

* Fix range over registered apps in GRPC server using RWLock, switch to sync.Map.

* Fix config.

* Fix tests to work on Windows.

* Fix operation start to not start the same application twice.

* Fix enabling and disabling of monitoring.
  • Loading branch information
blakerouse authored Jun 10, 2020
1 parent 386716e commit 0c15394
Show file tree
Hide file tree
Showing 55 changed files with 1,072 additions and 1,719 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*beat/logs
*beat/data
x-pack/functionbeat/pkg
x-pack/elastic-agent/data
x-pack/elastic-agent/pkg/agent/operation/tests/downloads

# Files
.DS_Store
Expand All @@ -25,6 +27,9 @@ mage_output_file.go
x-pack/functionbeat/*/fields.yml
x-pack/functionbeat/provider/*/functionbeat-*
x-pack/dockerlogbeat/temproot.tar
x-pack/elastic-agent/elastic_agent
x-pack/elastic-agent/fleet.yml
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable

# Editor swap files
*.swp
Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.ConfigManager.Start()
b.ConfigManager.Start(beater.Stop)
defer b.ConfigManager.Stop()

return beater.Run(&b.Beat)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ConfigManager interface {
Enabled() bool

// Start the config manager
Start()
Start(func())

// Stop the config manager
Stop()
Expand Down Expand Up @@ -98,6 +98,6 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, err
}

func (nilManager) Enabled() bool { return false }
func (nilManager) Start() {}
func (nilManager) Start(_ func()) {}
func (nilManager) Stop() {}
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@
- Change monitoring defaults for agent {pull}18927[18927]
- Agent verifies packages before using them {pull}18876[18876]
- Change stream.* to dataset.* fields {pull}18967[18967]
- Agent now runs the GRPC server and spawned application connect by to Agent {pull}18973[18973]
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
7 changes: 6 additions & 1 deletion x-pack/elastic-agent/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -165,8 +166,12 @@ func (Build) Clean() {
func (Build) TestBinaries() error {
p := filepath.Join("pkg", "agent", "operation", "tests", "scripts")

binaryName := "configurable"
if runtime.GOOS == "windows" {
binaryName += ".exe"
}
return combineErr(
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86", "configurable"), filepath.Join(p, "configurable-1.0-darwin-x86", "main.go")),
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
)
}

Expand Down
13 changes: 12 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log"
Expand All @@ -39,6 +41,7 @@ type Local struct {
log *logger.Logger
source source
agentInfo *info.AgentInfo
srv *server.Server
}

type source interface {
Expand Down Expand Up @@ -78,6 +81,10 @@ func newLocal(
}

localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx)
localApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}

reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR)

Expand All @@ -86,7 +93,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor))
router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, localApplication.srv, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand All @@ -113,6 +120,9 @@ func (l *Local) Start() error {
l.log.Info("Agent is starting")
defer l.log.Info("Agent is stopped")

if err := l.srv.Start(); err != nil {
return err
}
if err := l.source.Start(); err != nil {
return err
}
Expand All @@ -123,6 +133,7 @@ func (l *Local) Start() error {
// Stop stops a local agent.
func (l *Local) Stop() error {
l.cancelCtxFn()
l.srv.Stop()
return l.source.Stop()
}

Expand Down
14 changes: 13 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"net/http"
"net/url"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand All @@ -44,6 +47,7 @@ type Managed struct {
api apiClient
agentInfo *info.AgentInfo
gateway *fleetGateway
srv *server.Server
}

func newManaged(
Expand Down Expand Up @@ -110,6 +114,10 @@ func newManaged(
}

managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}

logR := logreporter.NewReporter(log, cfg.Reporting.Log)
fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Reporting.Fleet)
Expand All @@ -123,7 +131,7 @@ func newManaged(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, managedApplication.srv, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down Expand Up @@ -200,6 +208,9 @@ func newManaged(
// Start starts a managed elastic-agent.
func (m *Managed) Start() error {
m.log.Info("Agent is starting")
if err := m.srv.Start(); err != nil {
return err
}
m.gateway.Start()
return nil
}
Expand All @@ -208,6 +219,7 @@ func (m *Managed) Start() error {
func (m *Managed) Stop() error {
defer m.log.Info("Agent is stopped")
m.cancelCtxFn()
m.srv.Stop()
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

// EventProcessor is an processor of application event
Expand Down Expand Up @@ -56,10 +57,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, r, m)
operator, err := newOperator(ctx, log, id, cfg, srv, r, m)
if err != nil {
return nil, err
}
Expand All @@ -71,7 +72,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
operatorConfig := operatorCfg.DefaultConfig()
if err := config.Unpack(&operatorConfig); err != nil {
return nil, err
Expand Down Expand Up @@ -102,6 +103,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
verifier,
installer,
stateResolver,
srv,
r,
m,
)
Expand Down
Loading

0 comments on commit 0c15394

Please sign in to comment.