Skip to content

Commit

Permalink
Implement basic version negotiation with Fleet Server (#3383)
Browse files Browse the repository at this point in the history
* Initial implementation of elastic api version in roundtrip and fleet client
* signal Fleet warning on elastic-agent status
  • Loading branch information
pchila authored Sep 12, 2023
1 parent 342cda7 commit fe0d427
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 33 deletions.
6 changes: 1 addition & 5 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,4 @@ inpackage: False
testonly: False
with-expecter: True
keeptree: True
case: underscore
note: |
Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
case: underscore
4 changes: 4 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,8 +855,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case configErr := <-c.managerChans.configManagerError:
if c.isManaged {
var wErr *WarningError
if configErr == nil {
c.setFleetState(agentclient.Healthy, "Connected")
} else if errors.As(configErr, &wErr) {
// we received a warning from Fleet, set state to degraded and the warning as state string
c.setFleetState(agentclient.Degraded, wErr.Error())
} else {
c.setFleetState(agentclient.Failed, configErr.Error())
}
Expand Down
14 changes: 14 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) {
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
}, 3*time.Second, 10*time.Millisecond)

// report a warning
cfgMgr.ReportError(ctx, NewWarningError("some msg from Fleet"))
assert.Eventually(t, func() bool {
state := coord.State()
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Degraded && state.FleetMessage == "some msg from Fleet"
}, 3*time.Second, 10*time.Millisecond)

// recover from warning error
cfgMgr.ReportError(ctx, nil)
assert.Eventually(t, func() bool {
state := coord.State()
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
}, 3*time.Second, 10*time.Millisecond)

cancel()
err = <-coordCh
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gateway
package coordinator

import (
"context"
Expand All @@ -28,3 +28,16 @@ type FleetGateway interface {
// SetClient sets the client for the gateway.
SetClient(client.Sender)
}

// WarningError is emitted when we receive a warning in the Fleet response
type WarningError struct {
msg string
}

func (w WarningError) Error() string {
return w.msg
}

func NewWarningError(warningMsg string) *WarningError {
return &WarningError{msg: warningMsg}
}
32 changes: 18 additions & 14 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
Expand Down Expand Up @@ -66,7 +65,7 @@ type stateStore interface {
Actions() []fleetapi.Action
}

type fleetGateway struct {
type FleetGateway struct {
log *logger.Logger
client client.Sender
scheduler scheduler.Scheduler
Expand All @@ -89,7 +88,7 @@ func New(
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (gateway.FleetGateway, error) {
) (*FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
Expand All @@ -113,8 +112,8 @@ func newFleetGatewayWithScheduler(
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (gateway.FleetGateway, error) {
return &fleetGateway{
) (*FleetGateway, error) {
return &FleetGateway{
log: log,
client: client,
settings: settings,
Expand All @@ -128,11 +127,11 @@ func newFleetGatewayWithScheduler(
}, nil
}

func (f *fleetGateway) Actions() <-chan []fleetapi.Action {
func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
return f.actionCh
}

func (f *fleetGateway) Run(ctx context.Context) error {
func (f *FleetGateway) Run(ctx context.Context) error {
// Backoff implementation doesn't support the use of a context [cancellation] as the shutdown mechanism.
// So we keep a done channel that will be closed when the current context is shutdown.
done := make(chan struct{})
Expand Down Expand Up @@ -174,11 +173,11 @@ func (f *fleetGateway) Run(ctx context.Context) error {
}

// Errors returns the channel to watch for reported errors.
func (f *fleetGateway) Errors() <-chan error {
func (f *FleetGateway) Errors() <-chan error {
return f.errCh
}

func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
bo.Reset()

// Guard if the context is stopped by a out of bound call,
Expand Down Expand Up @@ -222,7 +221,12 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
}

f.checkinFailCounter = 0
f.errCh <- nil
if resp.FleetWarning != "" {
f.errCh <- coordinator.NewWarningError(resp.FleetWarning)
} else {
f.errCh <- nil
}

// Request was successful, return the collected actions.
return resp, nil
}
Expand All @@ -232,7 +236,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
return nil, ctx.Err()
}

func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
if components == nil {
return nil
}
Expand Down Expand Up @@ -307,7 +311,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
return checkinComponents
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata(ctx, f.log)
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand Down Expand Up @@ -367,15 +371,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
func (f *fleetGateway) shouldUnenroll() bool {
func (f *FleetGateway) shouldUnenroll() bool {
return f.unauthCounter > maxUnauthCounter
}

func isUnauth(err error) bool {
return errors.Is(err, client.ErrInvalidAPIKey)
}

func (f *fleetGateway) SetClient(c client.Sender) {
func (f *FleetGateway) SetClient(c client.Sender) {
f.client = c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"gotest.tools/assert"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
Expand Down Expand Up @@ -70,7 +69,7 @@ func newTestingClient() *testingClient {
return &testingClient{received: make(chan struct{}, 1)}
}

type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *scheduler.Stepper)
type withGatewayFunc func(*testing.T, coordinator.FleetGateway, *testingClient, *scheduler.Stepper)

func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
Expand Down Expand Up @@ -128,7 +127,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("send no event and receive no action", withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -160,7 +159,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -321,7 +320,7 @@ func TestRetriesOnFailures(t *testing.T) {
t.Run("When the gateway fails to communicate with the checkin API we will retry",
withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -369,7 +368,7 @@ func TestRetriesOnFailures(t *testing.T) {
Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
}, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -404,7 +403,7 @@ func emptyStateFetcher() coordinator.State {
return coordinator.State{}
}

func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error {
func runFleetGateway(ctx context.Context, g coordinator.FleetGateway) <-chan error {
done := make(chan bool)
errCh := make(chan error, 1)
go func() {
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
Expand Down Expand Up @@ -223,7 +222,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
}

// runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval.
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
t := time.NewTimer(flushInterval)
for {
select {
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func (e *CheckinRequest) Validate() error {
// CheckinResponse is the response send back from the server which contains all the action that
// need to be executed or proxy to running processes.
type CheckinResponse struct {
AckToken string `json:"ack_token"`
Actions Actions `json:"actions"`
AckToken string `json:"ack_token"`
Actions Actions `json:"actions"`
FleetWarning string `json:"-"`
}

// Validate validates the response send from the server.
Expand Down Expand Up @@ -140,6 +141,7 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe
}

checkinResponse := &CheckinResponse{}
checkinResponse.FleetWarning = resp.Header.Get("Warning")
decoder := json.NewDecoder(bytes.NewReader(rs))
if err := decoder.Decode(checkinResponse); err != nil {
return nil, sendDuration, errors.New(err,
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/fleetapi/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ type Sender interface {
URI() string
}

// Default value for Elastic-Api-Version header when sending requests to Fleet (that's the only version we have at the time of writing)
const defaultFleetApiVersion = "2023-06-01"

var baseRoundTrippers = func(rt http.RoundTripper) (http.RoundTripper, error) {
rt = NewFleetUserAgentRoundTripper(rt, release.Version())

rt = NewElasticApiVersionRoundTripper(rt, defaultFleetApiVersion)

return rt, nil
}

Expand Down
Loading

0 comments on commit fe0d427

Please sign in to comment.