Skip to content

Commit

Permalink
Expose fleet connectivity state separately (#2239)
Browse files Browse the repository at this point in the history
Expose fleet connectivity state separately  (#2239)
  • Loading branch information
michalpristas authored Feb 8, 2023
1 parent 237e608 commit effacff
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 147 deletions.
4 changes: 4 additions & 0 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ message StateResponse {
string message = 3;
// State of each component in Elastic Agent.
repeated ComponentState components = 4;
// Fleet connectivity state of Elastic Agent.
State fleetState = 5;
// Fleet connectivity state message of Elastic Agent.
string fleetMessage = 6;
}

// DiagnosticFileResult is a file result from a diagnostic result.
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func New(
var managed *managedConfigManager
var compModifiers []coordinator.ComponentsModifier
var composableManaged bool
var isManaged bool
if configuration.IsStandalone(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is managed locally")

Expand All @@ -96,6 +97,7 @@ func New(
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else {
isManaged = true
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
if err != nil {
Expand Down Expand Up @@ -127,7 +129,7 @@ func New(
return nil, errors.New(err, "failed to initialize composable controller")
}

coord := coordinator.New(log, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, composable, caps, monitor, compModifiers...)
coord := coordinator.New(log, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, composable, caps, monitor, isManaged, compModifiers...)
if managed != nil {
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
// coordinator, so it must be set here once the coordinator is created
Expand Down
42 changes: 33 additions & 9 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
agentclient "github.com/elastic/elastic-agent/internal/pkg/agent/control/v2/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/v2/cproto"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/config"
Expand Down Expand Up @@ -148,10 +149,12 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa

// State provides the current state of the coordinator along with all the current states of components and units.
type State struct {
State agentclient.State `yaml:"state"`
Message string `yaml:"message"`
Components []runtime.ComponentComponentState `yaml:"components"`
LogLevel logp.Level `yaml:"log_level"`
State agentclient.State `yaml:"state"`
Message string `yaml:"message"`
FleetState agentclient.State `yaml:"fleet_state"`
FleetMessage string `yaml:"fleet_message"`
Components []runtime.ComponentComponentState `yaml:"components"`
LogLevel logp.Level `yaml:"log_level"`
}

// StateFetcher provides an interface to fetch the current state of the coordinator.
Expand Down Expand Up @@ -190,7 +193,12 @@ type Coordinator struct {
}

// New creates a new coordinator.
func New(logger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capability, monitorMgr MonitorManager, modifiers ...ComponentsModifier) *Coordinator {
func New(logger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capability, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
var fleetState cproto.State
if !isManaged {
// default enum value is STARTING which is confusing for standalone
fleetState = agentclient.Stopped
}
return &Coordinator{
logger: logger,
agentInfo: agentInfo,
Expand All @@ -204,8 +212,9 @@ func New(logger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo,
caps: caps,
modifiers: modifiers,
state: coordinatorState{
state: agentclient.Starting,
logLevel: logLevel,
state: agentclient.Starting,
fleetState: fleetState,
logLevel: logLevel,
},
monitorMgr: monitorMgr,
}
Expand All @@ -216,6 +225,10 @@ func New(logger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo,
func (c *Coordinator) State(local bool) (s State) {
s.State = c.state.state
s.Message = c.state.message

s.FleetState = c.state.fleetState
s.FleetMessage = c.state.fleetMessage

s.Components = c.runtimeMgr.State()
s.LogLevel = c.state.logLevel
if c.state.overrideState != nil {
Expand All @@ -231,8 +244,8 @@ func (c *Coordinator) State(local bool) (s State) {
s.State = agentclient.Failed
s.Message = c.runtimeMgrErr.Error()
} else if local && c.configMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.configMgrErr.Error()
s.FleetState = agentclient.Failed
s.FleetMessage = c.configMgrErr.Error()
} else if c.actionsErr != nil {
s.State = agentclient.Failed
s.Message = c.actionsErr.Error()
Expand Down Expand Up @@ -401,6 +414,8 @@ func (c *Coordinator) Run(ctx context.Context) error {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.state.state = agentclient.Stopped
c.state.message = "Requested to be stopped"
c.state.fleetState = agentclient.Stopped
c.state.fleetMessage = "Requested to be stopped"
// do not restart
return err
}
Expand Down Expand Up @@ -592,6 +607,13 @@ func (c *Coordinator) runner(ctx context.Context) error {
case runtimeErr := <-c.runtimeMgr.Errors():
c.runtimeMgrErr = runtimeErr
case configErr := <-c.configMgr.Errors():
if configErr == nil {
c.state.fleetState = agentclient.Healthy
c.state.fleetMessage = ""
} else {
c.state.fleetState = agentclient.Failed
c.state.fleetMessage = configErr.Error()
}
c.configMgrErr = configErr
case actionsErr := <-c.configMgr.ActionErrors():
c.actionsErr = actionsErr
Expand Down Expand Up @@ -779,6 +801,8 @@ func (c *Coordinator) compute() (map[string]interface{}, []component.Component,
type coordinatorState struct {
state agentclient.State
message string
fleetState agentclient.State
fleetMessage string
overrideState *coordinatorOverrideState

config *config.Config
Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type fleetGateway struct {
checkinFailCounter int
stateFetcher coordinator.StateFetcher
stateStore stateStore
errCh chan error
actionCh chan []fleetapi.Action
}

Expand Down Expand Up @@ -118,6 +119,7 @@ func newFleetGatewayWithScheduler(
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
actionCh: make(chan []fleetapi.Action, 1),
}, nil
}
Expand Down Expand Up @@ -167,6 +169,11 @@ func (f *fleetGateway) Run(ctx context.Context) error {
}
}

// Errors returns the channel to watch for reported errors.
func (f *fleetGateway) Errors() <-chan error {
return f.errCh
}

func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
bo.Reset()

Expand Down Expand Up @@ -198,8 +205,10 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
)

f.log.Error(err)
f.errCh <- err
return nil, err
}
f.errCh <- err
continue
}

Expand All @@ -209,6 +218,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
}

f.checkinFailCounter = 0
f.errCh <- nil
// Request was successful, return the collected actions.
return resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package fleet
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -23,6 +22,7 @@ import (

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
Expand Down Expand Up @@ -404,15 +404,27 @@ func (e *emptyStateFetcher) State(_ bool) coordinator.State {
}

func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error {
done := make(chan bool)
errCh := make(chan error, 1)
go func() {
err := g.Run(ctx)
close(done)
if err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
} else {
errCh <- nil
}
}()
go func() {
for {
select {
case <-done:
return
case <-g.Errors():
// ignore errors here
}
}
}()
return errCh
}

Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/agent/application/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type FleetGateway interface {
// Run runs the gateway.
Run(ctx context.Context) error

// Errors returns the channel to watch for reported errors.
Errors() <-chan error

// Actions returns the channel to watch for new actions from the fleet-server.
Actions() <-chan []fleetapi.Action

Expand Down
13 changes: 13 additions & 0 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,21 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
policyChanger.AddSetter(ack)
}

// Proxy errors from the gateway to our own channel.
gatewayErrorsRunner := runner.Start(context.Background(), func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-gateway.Errors():
m.errCh <- err
}
}
})

// Run the gateway.
gatewayRunner := runner.Start(gatewayCtx, func(ctx context.Context) error {
defer gatewayErrorsRunner.Stop()
return gateway.Run(ctx)
})

Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func outputState(w io.Writer, state *client.AgentState) error {
} else {
fmt.Fprintf(w, "Message: %s\n", state.Message)
}
fmt.Fprintf(w, "Fleet State: %s\n", state.FleetState)
if state.FleetMessage == "" {
fmt.Fprint(w, "Fleet Message: (no message)\n")
} else {
fmt.Fprintf(w, "Fleet Message: %s\n", state.FleetMessage)
}
if len(state.Components) == 0 {
fmt.Fprint(w, "Components: (none)\n")
} else {
Expand Down
17 changes: 11 additions & 6 deletions internal/pkg/agent/control/v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ type AgentStateInfo struct {

// AgentState is the current state of the Elastic Agent.
type AgentState struct {
Info AgentStateInfo `json:"info" yaml:"info"`
State State `json:"state" yaml:"state"`
Message string `json:"message" yaml:"message"`
Components []ComponentState `json:"components" yaml:"components"`
Info AgentStateInfo `json:"info" yaml:"info"`
State State `json:"state" yaml:"state"`
Message string `json:"message" yaml:"message"`
Components []ComponentState `json:"components" yaml:"components"`
FleetState State `yaml:"fleet_state"`
FleetMessage string `yaml:"fleet_message"`
}

// DiagnosticFileResult is a diagnostic file result.
Expand Down Expand Up @@ -226,8 +228,11 @@ func (c *client) State(ctx context.Context) (*AgentState, error) {
BuildTime: res.Info.BuildTime,
Snapshot: res.Info.Snapshot,
},
State: res.State,
Message: res.Message,
State: res.State,
Message: res.Message,
FleetState: res.FleetState,
FleetMessage: res.FleetMessage,

Components: make([]ComponentState, 0, len(res.Components)),
}
for _, comp := range res.Components {
Expand Down
Loading

0 comments on commit effacff

Please sign in to comment.