Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic version negotiation with Fleet Server #3383

Merged
merged 8 commits into from
Sep 12, 2023
6 changes: 1 addition & 5 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,4 @@ inpackage: False
testonly: False
with-expecter: True
keeptree: True
case: underscore
note: |
Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
case: underscore
4 changes: 4 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,8 +855,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case configErr := <-c.managerChans.configManagerError:
if c.isManaged {
var wErr *WarningError
if configErr == nil {
c.setFleetState(agentclient.Healthy, "Connected")
} else if errors.As(configErr, &wErr) {
// we received a warning from Fleet, set state to degraded and the warning as state string
c.setFleetState(agentclient.Degraded, wErr.Error())
} else {
c.setFleetState(agentclient.Failed, configErr.Error())
}
Expand Down
14 changes: 14 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) {
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
}, 3*time.Second, 10*time.Millisecond)

// report a warning
cfgMgr.ReportError(ctx, NewWarningError("some msg from Fleet"))
assert.Eventually(t, func() bool {
state := coord.State()
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Degraded && state.FleetMessage == "some msg from Fleet"
}, 3*time.Second, 10*time.Millisecond)

// recover from warning error
cfgMgr.ReportError(ctx, nil)
assert.Eventually(t, func() bool {
state := coord.State()
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
}, 3*time.Second, 10*time.Millisecond)

cancel()
err = <-coordCh
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gateway
package coordinator

import (
"context"
Expand All @@ -28,3 +28,16 @@ type FleetGateway interface {
// SetClient sets the client for the gateway.
SetClient(client.Sender)
}

// WarningError is emitted when we receive a warning in the Fleet response
type WarningError struct {
msg string
}

func (w WarningError) Error() string {
return w.msg
}

func NewWarningError(warningMsg string) *WarningError {
return &WarningError{msg: warningMsg}
}
32 changes: 18 additions & 14 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
Expand Down Expand Up @@ -66,7 +65,7 @@ type stateStore interface {
Actions() []fleetapi.Action
}

type fleetGateway struct {
type FleetGateway struct {
log *logger.Logger
client client.Sender
scheduler scheduler.Scheduler
Expand All @@ -89,7 +88,7 @@ func New(
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (gateway.FleetGateway, error) {
) (*FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
Expand All @@ -113,8 +112,8 @@ func newFleetGatewayWithScheduler(
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (gateway.FleetGateway, error) {
return &fleetGateway{
) (*FleetGateway, error) {
return &FleetGateway{
log: log,
client: client,
settings: settings,
Expand All @@ -128,11 +127,11 @@ func newFleetGatewayWithScheduler(
}, nil
}

func (f *fleetGateway) Actions() <-chan []fleetapi.Action {
func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
return f.actionCh
}

func (f *fleetGateway) Run(ctx context.Context) error {
func (f *FleetGateway) Run(ctx context.Context) error {
// Backoff implementation doesn't support the use of a context [cancellation] as the shutdown mechanism.
// So we keep a done channel that will be closed when the current context is shutdown.
done := make(chan struct{})
Expand Down Expand Up @@ -174,11 +173,11 @@ func (f *fleetGateway) Run(ctx context.Context) error {
}

// Errors returns the channel to watch for reported errors.
func (f *fleetGateway) Errors() <-chan error {
func (f *FleetGateway) Errors() <-chan error {
return f.errCh
}

func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
bo.Reset()

// Guard if the context is stopped by a out of bound call,
Expand Down Expand Up @@ -222,7 +221,12 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
}

f.checkinFailCounter = 0
f.errCh <- nil
if resp.FleetWarning != "" {
f.errCh <- coordinator.NewWarningError(resp.FleetWarning)
} else {
f.errCh <- nil
}

// Request was successful, return the collected actions.
return resp, nil
}
Expand All @@ -232,7 +236,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
return nil, ctx.Err()
}

func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
if components == nil {
return nil
}
Expand Down Expand Up @@ -307,7 +311,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
return checkinComponents
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata(ctx, f.log)
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand Down Expand Up @@ -367,15 +371,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
func (f *fleetGateway) shouldUnenroll() bool {
func (f *FleetGateway) shouldUnenroll() bool {
return f.unauthCounter > maxUnauthCounter
}

func isUnauth(err error) bool {
return errors.Is(err, client.ErrInvalidAPIKey)
}

func (f *fleetGateway) SetClient(c client.Sender) {
func (f *FleetGateway) SetClient(c client.Sender) {
f.client = c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"gotest.tools/assert"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
Expand Down Expand Up @@ -70,7 +69,7 @@ func newTestingClient() *testingClient {
return &testingClient{received: make(chan struct{}, 1)}
}

type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *scheduler.Stepper)
type withGatewayFunc func(*testing.T, coordinator.FleetGateway, *testingClient, *scheduler.Stepper)

func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
Expand Down Expand Up @@ -128,7 +127,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("send no event and receive no action", withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -160,7 +159,7 @@ func TestFleetGateway(t *testing.T) {

t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -321,7 +320,7 @@ func TestRetriesOnFailures(t *testing.T) {
t.Run("When the gateway fails to communicate with the checkin API we will retry",
withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -369,7 +368,7 @@ func TestRetriesOnFailures(t *testing.T) {
Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
}, func(
t *testing.T,
gateway gateway.FleetGateway,
gateway coordinator.FleetGateway,
client *testingClient,
scheduler *scheduler.Stepper,
) {
Expand Down Expand Up @@ -404,7 +403,7 @@ func emptyStateFetcher() coordinator.State {
return coordinator.State{}
}

func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error {
func runFleetGateway(ctx context.Context, g coordinator.FleetGateway) <-chan error {
done := make(chan bool)
errCh := make(chan error, 1)
go func() {
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
Expand Down Expand Up @@ -223,7 +222,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
}

// runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval.
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
t := time.NewTimer(flushInterval)
for {
select {
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func (e *CheckinRequest) Validate() error {
// CheckinResponse is the response send back from the server which contains all the action that
// need to be executed or proxy to running processes.
type CheckinResponse struct {
AckToken string `json:"ack_token"`
Actions Actions `json:"actions"`
AckToken string `json:"ack_token"`
Actions Actions `json:"actions"`
FleetWarning string `json:"-"`
}

// Validate validates the response send from the server.
Expand Down Expand Up @@ -140,6 +141,7 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe
}

checkinResponse := &CheckinResponse{}
checkinResponse.FleetWarning = resp.Header.Get("Warning")
decoder := json.NewDecoder(bytes.NewReader(rs))
if err := decoder.Decode(checkinResponse); err != nil {
return nil, sendDuration, errors.New(err,
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/fleetapi/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ type Sender interface {
URI() string
}

// Default value for Elastic-Api-Version header when sending requests to Fleet (that's the only version we have at the time of writing)
const defaultFleetApiVersion = "2023-06-01"

var baseRoundTrippers = func(rt http.RoundTripper) (http.RoundTripper, error) {
rt = NewFleetUserAgentRoundTripper(rt, release.Version())

rt = NewElasticApiVersionRoundTripper(rt, defaultFleetApiVersion)

return rt, nil
}

Expand Down
Loading