Skip to content

Commit

Permalink
Refactor: Move actions into separate packages (elastic#24702)
Browse files Browse the repository at this point in the history
Refactor: Move actions into separate packages  (elastic#24702)
  • Loading branch information
michalpristas authored Mar 23, 2021
1 parent 6c9386d commit 34d343c
Show file tree
Hide file tree
Showing 42 changed files with 838 additions and 801 deletions.
53 changes: 0 additions & 53 deletions x-pack/elastic-agent/pkg/agent/application/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,10 @@
package application

import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana"
)

type localConfig struct {
Fleet *configuration.FleetAgentConfig `config:"fleet"`
Settings *configuration.SettingsConfig `config:"agent" yaml:"agent"`
}

func createFleetConfigFromEnroll(accessAPIKey string, kbn *kibana.Config) (*configuration.FleetAgentConfig, error) {
cfg := configuration.DefaultFleetAgentConfig()
cfg.Enabled = true
cfg.AccessAPIKey = accessAPIKey
cfg.Kibana = kbn

if err := cfg.Valid(); err != nil {
return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig)
}
return cfg, nil
}

func createFleetServerBootstrapConfig(connStr string, policyID string, host string, port uint16, cert string, key string, esCA string) (*configuration.FleetAgentConfig, error) {
es, err := configuration.ElasticsearchFromConnStr(connStr)
if err != nil {
return nil, err
}
if esCA != "" {
es.TLS = &tlscommon.Config{
CAs: []string{esCA},
}
}
cfg := configuration.DefaultFleetAgentConfig()
cfg.Enabled = true
cfg.Server = &configuration.FleetServerConfig{
Bootstrap: true,
Output: configuration.FleetServerOutputConfig{
Elasticsearch: es,
},
Host: host,
Port: port,
}
if policyID != "" {
cfg.Server.Policy = &configuration.FleetServerPolicyConfig{ID: policyID}
}
if cert != "" || key != "" {
cfg.Server.TLS = &tlscommon.Config{
Certificate: tlscommon.CertificateConfig{
Certificate: cert,
Key: key,
},
}
}

if err := cfg.Valid(); err != nil {
return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig)
}
return cfg, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline/emitter/modifiers"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline/router"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline/stream"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
Expand All @@ -36,7 +37,7 @@ type FleetServerBootstrap struct {
log *logger.Logger
Config configuration.FleetAgentConfig
agentInfo *info.AgentInfo
router pipeline.Dispatcher
router pipeline.Router
source source
srv *server.Server
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func newFleetServerBootstrap(
agentInfo,
router,
&pipeline.ConfigModifiers{
Filters: []pipeline.FilterFunc{filters.StreamChecker, injectFleet(rawConfig, sysInfo.Info(), agentInfo)},
Filters: []pipeline.FilterFunc{filters.StreamChecker, modifiers.InjectFleet(rawConfig, sysInfo.Info(), agentInfo)},
},
)
if err != nil {
Expand Down Expand Up @@ -141,7 +142,7 @@ func (b *FleetServerBootstrap) AgentInfo() *info.AgentInfo {
return b.agentInfo
}

func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Dispatcher, modifiers *pipeline.ConfigModifiers) (pipeline.EmitterFunc, error) {
func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers) (pipeline.EmitterFunc, error) {
ch := make(chan *config.Config)

go func() {
Expand All @@ -166,7 +167,7 @@ func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpi
}, nil
}

func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Dispatcher, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
if err := info.InjectAgentConfig(c); err != nil {
return err
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Di
return errors.New("bootstrap configuration is incorrect causing fleet-server to not be started")
}

return router.Dispatch(ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
return router.Route(ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
pipeline.DefaultRK: {
{
Spec: spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package fleet

import (
"context"
Expand All @@ -14,6 +14,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi/client"

"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/gateway"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage/store"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -45,12 +48,6 @@ type backoffSettings struct {
Max time.Duration `config:"max"`
}

type fleetAcker = store.FleetAcker

type dispatcher interface {
Dispatch(acker fleetAcker, actions ...action) error
}

type agentInfo interface {
AgentID() string
}
Expand All @@ -59,18 +56,6 @@ type fleetReporter interface {
Events() ([]fleetapi.SerializableEvent, func())
}

// FleetGateway is a gateway between the Agent and the Fleet API, it's take cares of all the
// bidirectional communication requirements. The gateway aggregates events and will periodically
// call the API to send the events and will receive actions to be executed locally.
// The only supported action for now is a "ActionPolicyChange".
type FleetGateway interface {
// Start starts the gateway.
Start() error

// Set the client for the gateway.
SetClient(client.Sender)
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
Expand All @@ -82,7 +67,7 @@ type stateStore interface {
type fleetGateway struct {
bgContext context.Context
log *logger.Logger
dispatcher dispatcher
dispatcher pipeline.Dispatcher
client client.Sender
scheduler scheduler.Scheduler
backoff backoff.Backoff
Expand All @@ -91,24 +76,25 @@ type fleetGateway struct {
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker fleetAcker
acker store.FleetAcker
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore stateStore
}

func newFleetGateway(
// New creates a new fleet gateway
func New(
ctx context.Context,
log *logger.Logger,
agentInfo agentInfo,
client client.Sender,
d dispatcher,
d pipeline.Dispatcher,
r fleetReporter,
acker fleetAcker,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
) (FleetGateway, error) {
) (gateway.FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
Expand All @@ -132,13 +118,13 @@ func newFleetGatewayWithScheduler(
settings *fleetGatewaySettings,
agentInfo agentInfo,
client client.Sender,
d dispatcher,
d pipeline.Dispatcher,
scheduler scheduler.Scheduler,
r fleetReporter,
acker fleetAcker,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
) (FleetGateway, error) {
) (gateway.FleetGateway, error) {

// Backoff implementation doesn't support the using context as the shutdown mechanism.
// So we keep a done channel that will be closed when the current context is shutdown.
Expand Down Expand Up @@ -182,7 +168,7 @@ func (f *fleetGateway) worker() {
continue
}

actions := make([]action, len(resp.Actions))
actions := make([]fleetapi.Action, len(resp.Actions))
for idx, a := range resp.Actions {
actions[idx] = a
}
Expand Down Expand Up @@ -234,7 +220,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
// get events
ee, ack := f.reporter.Events()

ecsMeta, err := metadata()
ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package fleet

import (
"bytes"
Expand All @@ -20,10 +20,12 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/gateway"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage/store"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
noopacker "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi/acker/noop"
repo "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -68,15 +70,15 @@ func newTestingClient() *testingClient {
return &testingClient{received: make(chan struct{}, 1)}
}

type testingDispatcherFunc func(...action) error
type testingDispatcherFunc func(...fleetapi.Action) error

type testingDispatcher struct {
sync.Mutex
callback testingDispatcherFunc
received chan struct{}
}

func (t *testingDispatcher) Dispatch(acker fleetAcker, actions ...action) error {
func (t *testingDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
Expand Down Expand Up @@ -108,7 +110,7 @@ func newTestingDispatcher() *testingDispatcher {
return &testingDispatcher{received: make(chan struct{}, 1)}
}

type withGatewayFunc func(*testing.T, FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend)
type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend)

func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
Expand Down Expand Up @@ -175,7 +177,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("send no event and receive no action", withGateway(agentInfo, settings, func(
t *testing.T,
gateway FleetGateway,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
Expand All @@ -186,7 +188,7 @@ func TestFleetGateway(t *testing.T) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
dispatcher.Answer(func(actions ...action) error {
dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
Expand All @@ -200,7 +202,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func(
t *testing.T,
gateway FleetGateway,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
Expand Down Expand Up @@ -230,7 +232,7 @@ func TestFleetGateway(t *testing.T) {
`)
return resp, nil
}),
dispatcher.Answer(func(actions ...action) error {
dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 2, len(actions))
return nil
}),
Expand Down Expand Up @@ -275,7 +277,7 @@ func TestFleetGateway(t *testing.T) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
dispatcher.Answer(func(actions ...action) error {
dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
Expand All @@ -295,7 +297,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("send event and receive no action", withGateway(agentInfo, settings, func(
t *testing.T,
gateway FleetGateway,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
Expand All @@ -319,7 +321,7 @@ func TestFleetGateway(t *testing.T) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
dispatcher.Answer(func(actions ...action) error {
dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
Expand Down Expand Up @@ -364,7 +366,7 @@ func TestFleetGateway(t *testing.T) {

require.NoError(t, err)

ch1 := dispatcher.Answer(func(actions ...action) error { return nil })
ch1 := dispatcher.Answer(func(actions ...fleetapi.Action) error { return nil })
ch2 := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
Expand Down Expand Up @@ -407,7 +409,7 @@ func TestRetriesOnFailures(t *testing.T) {
t.Run("When the gateway fails to communicate with the checkin API we will retry",
withGateway(agentInfo, settings, func(
t *testing.T,
gateway FleetGateway,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
Expand Down Expand Up @@ -448,7 +450,7 @@ func TestRetriesOnFailures(t *testing.T) {
return resp, nil
}),

dispatcher.Answer(func(actions ...action) error {
dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
Expand All @@ -463,7 +465,7 @@ func TestRetriesOnFailures(t *testing.T) {
Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
}, func(
t *testing.T,
gateway FleetGateway,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package fleet

import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
Expand Down
Loading

0 comments on commit 34d343c

Please sign in to comment.