diff --git a/.mockery.yaml b/.mockery.yaml index dcb3ae6bb9b..2cf0461278e 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -2,8 +2,4 @@ inpackage: False testonly: False with-expecter: True keeptree: True -case: underscore -note: | - Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - // or more contributor license agreements. Licensed under the Elastic License; - // you may not use this file except in compliance with the Elastic License. +case: underscore \ No newline at end of file diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index f45ba99ba86..e6a2dbc182f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -855,8 +855,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case configErr := <-c.managerChans.configManagerError: if c.isManaged { + var wErr *WarningError if configErr == nil { c.setFleetState(agentclient.Healthy, "Connected") + } else if errors.As(configErr, &wErr) { + // we received a warning from Fleet, set state to degraded and the warning as state string + c.setFleetState(agentclient.Degraded, wErr.Error()) } else { c.setFleetState(agentclient.Failed, configErr.Error()) } diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 53d4524a1cb..af51ed63523 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -222,6 +222,20 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) { return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected" }, 3*time.Second, 10*time.Millisecond) + // report a warning + cfgMgr.ReportError(ctx, NewWarningError("some msg from Fleet")) + assert.Eventually(t, func() bool { + state := coord.State() + return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Degraded && state.FleetMessage == "some msg from Fleet" + }, 3*time.Second, 10*time.Millisecond) + + // recover from warning error + cfgMgr.ReportError(ctx, nil) + assert.Eventually(t, func() bool { + state := coord.State() + return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected" + }, 3*time.Second, 10*time.Millisecond) + cancel() err = <-coordCh require.NoError(t, err) diff --git a/internal/pkg/agent/application/gateway/gateway.go b/internal/pkg/agent/application/coordinator/gateway.go similarity index 78% rename from internal/pkg/agent/application/gateway/gateway.go rename to internal/pkg/agent/application/coordinator/gateway.go index 6946c8671a4..f32dd439b8d 100644 --- a/internal/pkg/agent/application/gateway/gateway.go +++ b/internal/pkg/agent/application/coordinator/gateway.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package gateway +package coordinator import ( "context" @@ -28,3 +28,16 @@ type FleetGateway interface { // SetClient sets the client for the gateway. SetClient(client.Sender) } + +// WarningError is emitted when we receive a warning in the Fleet response +type WarningError struct { + msg string +} + +func (w WarningError) Error() string { + return w.msg +} + +func NewWarningError(warningMsg string) *WarningError { + return &WarningError{msg: warningMsg} +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 363df4bdbe5..31c81955a10 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -12,7 +12,6 @@ import ( eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/core/backoff" @@ -66,7 +65,7 @@ type stateStore interface { Actions() []fleetapi.Action } -type fleetGateway struct { +type FleetGateway struct { log *logger.Logger client client.Sender scheduler scheduler.Scheduler @@ -89,7 +88,7 @@ func New( acker acker.Acker, stateFetcher func() coordinator.State, stateStore stateStore, -) (gateway.FleetGateway, error) { +) (*FleetGateway, error) { scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( @@ -113,8 +112,8 @@ func newFleetGatewayWithScheduler( acker acker.Acker, stateFetcher func() coordinator.State, stateStore stateStore, -) (gateway.FleetGateway, error) { - return &fleetGateway{ +) (*FleetGateway, error) { + return &FleetGateway{ log: log, client: client, settings: settings, @@ -128,11 +127,11 @@ func newFleetGatewayWithScheduler( }, nil } -func (f *fleetGateway) Actions() <-chan []fleetapi.Action { +func (f *FleetGateway) Actions() <-chan []fleetapi.Action { return f.actionCh } -func (f *fleetGateway) Run(ctx context.Context) error { +func (f *FleetGateway) Run(ctx context.Context) error { // Backoff implementation doesn't support the use of a context [cancellation] as the shutdown mechanism. // So we keep a done channel that will be closed when the current context is shutdown. done := make(chan struct{}) @@ -174,11 +173,11 @@ func (f *fleetGateway) Run(ctx context.Context) error { } // Errors returns the channel to watch for reported errors. -func (f *fleetGateway) Errors() <-chan error { +func (f *FleetGateway) Errors() <-chan error { return f.errCh } -func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) { +func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) { bo.Reset() // Guard if the context is stopped by a out of bound call, @@ -222,7 +221,12 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee } f.checkinFailCounter = 0 - f.errCh <- nil + if resp.FleetWarning != "" { + f.errCh <- coordinator.NewWarningError(resp.FleetWarning) + } else { + f.errCh <- nil + } + // Request was successful, return the collected actions. return resp, nil } @@ -232,7 +236,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee return nil, ctx.Err() } -func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent { +func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent { if components == nil { return nil } @@ -307,7 +311,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component return checkinComponents } -func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) { +func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) { ecsMeta, err := info.Metadata(ctx, f.log) if err != nil { f.log.Error(errors.New("failed to load metadata", err)) @@ -367,7 +371,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, } // shouldUnenroll checks if the max number of trying an invalid key is reached -func (f *fleetGateway) shouldUnenroll() bool { +func (f *FleetGateway) shouldUnenroll() bool { return f.unauthCounter > maxUnauthCounter } @@ -375,7 +379,7 @@ func isUnauth(err error) bool { return errors.Is(err, client.ErrInvalidAPIKey) } -func (f *fleetGateway) SetClient(c client.Sender) { +func (f *FleetGateway) SetClient(c client.Sender) { f.client = c } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 51fda3581d4..04572eab845 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -23,7 +23,6 @@ import ( "gotest.tools/assert" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" @@ -70,7 +69,7 @@ func newTestingClient() *testingClient { return &testingClient{received: make(chan struct{}, 1)} } -type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *scheduler.Stepper) +type withGatewayFunc func(*testing.T, coordinator.FleetGateway, *testingClient, *scheduler.Stepper) func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) { return func(t *testing.T) { @@ -128,7 +127,7 @@ func TestFleetGateway(t *testing.T) { t.Run("send no event and receive no action", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -160,7 +159,7 @@ func TestFleetGateway(t *testing.T) { t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -321,7 +320,7 @@ func TestRetriesOnFailures(t *testing.T) { t.Run("When the gateway fails to communicate with the checkin API we will retry", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -369,7 +368,7 @@ func TestRetriesOnFailures(t *testing.T) { Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute}, }, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -404,7 +403,7 @@ func emptyStateFetcher() coordinator.State { return coordinator.State{} } -func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error { +func runFleetGateway(ctx context.Context, g coordinator.FleetGateway) <-chan error { done := make(chan bool) errCh := make(chan error, 1) go func() { diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 6895003c38e..f29be7fe258 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" @@ -223,7 +222,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error { } // runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval. -func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) { +func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) { t := time.NewTimer(flushInterval) for { select { diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 7150dade0f7..b52eeb3903f 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -77,8 +77,9 @@ func (e *CheckinRequest) Validate() error { // CheckinResponse is the response send back from the server which contains all the action that // need to be executed or proxy to running processes. type CheckinResponse struct { - AckToken string `json:"ack_token"` - Actions Actions `json:"actions"` + AckToken string `json:"ack_token"` + Actions Actions `json:"actions"` + FleetWarning string `json:"-"` } // Validate validates the response send from the server. @@ -140,6 +141,7 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe } checkinResponse := &CheckinResponse{} + checkinResponse.FleetWarning = resp.Header.Get("Warning") decoder := json.NewDecoder(bytes.NewReader(rs)) if err := decoder.Decode(checkinResponse); err != nil { return nil, sendDuration, errors.New(err, diff --git a/internal/pkg/fleetapi/client/client.go b/internal/pkg/fleetapi/client/client.go index 0f478497bb6..b6c9f7584ef 100644 --- a/internal/pkg/fleetapi/client/client.go +++ b/internal/pkg/fleetapi/client/client.go @@ -34,8 +34,14 @@ type Sender interface { URI() string } +// Default value for Elastic-Api-Version header when sending requests to Fleet (that's the only version we have at the time of writing) +const defaultFleetApiVersion = "2023-06-01" + var baseRoundTrippers = func(rt http.RoundTripper) (http.RoundTripper, error) { rt = NewFleetUserAgentRoundTripper(rt, release.Version()) + + rt = NewElasticApiVersionRoundTripper(rt, defaultFleetApiVersion) + return rt, nil } diff --git a/internal/pkg/fleetapi/client/client_test.go b/internal/pkg/fleetapi/client/client_test.go index f8d068663dd..cc5d2501aa8 100644 --- a/internal/pkg/fleetapi/client/client_test.go +++ b/internal/pkg/fleetapi/client/client_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/remote" + "github.com/elastic/elastic-agent/pkg/core/logger" ) func TestHTTPClient(t *testing.T) { @@ -71,7 +73,10 @@ func TestHTTPClient(t *testing.T) { }) require.NoError(t, err) - _, err = client.Send(ctx, "GET", "/echo-hello", nil, nil, nil) + resp, err := client.Send(ctx, "GET", "/echo-hello", nil, nil, nil) + if err == nil { + defer resp.Body.Close() + } require.Error(t, err) }, )) @@ -118,7 +123,10 @@ func TestHTTPClient(t *testing.T) { }) require.NoError(t, err) - _, err = client.Send(timeoutCtx, "GET", "/echo-hello", nil, nil, nil) + resp, err := client.Send(timeoutCtx, "GET", "/echo-hello", nil, nil, nil) + if err == nil { + defer resp.Body.Close() + } require.Error(t, err) }) } @@ -147,3 +155,94 @@ func TestExtract(t *testing.T) { assert.True(t, strings.Index(err.Error(), "fails because") > 0) }) } + +func TestElasticApiVersion(t *testing.T) { + t.Run("verify that Elastic-Api-Version header is present", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mux := http.NewServeMux() + mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + writer.WriteHeader(http.StatusOK) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, _ := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + + resp, err := clt.Send(ctx, http.MethodGet, "/", nil, nil, nil) + if assert.NoError(t, err) { + defer resp.Body.Close() + } + }) + + t.Run("verify that we log a downgrade request", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mux := http.NewServeMux() + mux.HandleFunc("/downgrade", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // request to downgrade to a completely fictitious version (just testing that we get a log for that) + writer.Header().Add(elasticApiVersionHeaderKey, request.URL.Query().Get("version")) + writer.WriteHeader(http.StatusBadRequest) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + + resp, err := clt.Send(ctx, http.MethodGet, "/downgrade", map[string][]string{"version": {"2020-01-01"}}, nil, nil) + if assert.NoError(t, err) { + defer resp.Body.Close() + } + logs := obsLogs.FilterMessageSnippet("fleet requested a different api version \"2020-01-01\"").All() + t.Logf("retrieved logs: %v", logs) + assert.NotEmptyf(t, logs, "downgrade response was not logged") + }) + + t.Run("verify that we log an incoming warning", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mux := http.NewServeMux() + mux.HandleFunc("/warning", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // send back a warning simulating an unsupported api version + writer.Header().Add("Warning", request.URL.Query().Get("warning_msg")) + writer.WriteHeader(http.StatusBadRequest) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + + warningText := "API version is no longer supported. Upgrade immediately!" + resp, err := clt.Send(ctx, http.MethodGet, "/warning", map[string][]string{"warning_msg": {warningText}}, nil, nil) + if assert.NoError(t, err) { + defer resp.Body.Close() + } + logs := obsLogs.FilterMessageSnippet("API version is no longer supported. Upgrade immediately!").All() + t.Logf("retrieved logs: %v", logs) + assert.NotEmptyf(t, logs, "warning was not logged") + }) +} diff --git a/internal/pkg/fleetapi/client/round_trippers.go b/internal/pkg/fleetapi/client/round_trippers.go index b5ce1bea3f2..57835d31f61 100644 --- a/internal/pkg/fleetapi/client/round_trippers.go +++ b/internal/pkg/fleetapi/client/round_trippers.go @@ -69,3 +69,22 @@ func NewFleetAuthRoundTripper( } return &FleetAuthRoundTripper{rt: wrapped, apiKey: apiKey}, nil } + +// ElasticApiVersionRoundTripper adds an Elastic-Api-Version header on every request. +type ElasticApiVersionRoundTripper struct { + rt http.RoundTripper + elasticApiVersion string +} + +const elasticApiVersionHeaderKey = "Elastic-Api-Version" + +// RoundTrip adds an Elastic-Api-Version header on every request. +func (r *ElasticApiVersionRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set(elasticApiVersionHeaderKey, r.elasticApiVersion) + + return r.rt.RoundTrip(req) +} + +func NewElasticApiVersionRoundTripper(inner http.RoundTripper, elasticApiVersion string) http.RoundTripper { + return &ElasticApiVersionRoundTripper{elasticApiVersion: elasticApiVersion, rt: inner} +} diff --git a/internal/pkg/remote/client.go b/internal/pkg/remote/client.go index cc3017dc17f..078016d1d87 100644 --- a/internal/pkg/remote/client.go +++ b/internal/pkg/remote/client.go @@ -220,6 +220,7 @@ func (c *Client) Send( c.log.With("error", err).Debugf(msg) continue } + c.checkApiVersionHeaders(reqID, resp) return resp, nil } @@ -227,6 +228,22 @@ func (c *Client) Send( return nil, fmt.Errorf("all hosts failed: %w", multiErr) } +func (c *Client) checkApiVersionHeaders(reqID string, resp *http.Response) { + const elasticApiVersionHeaderKey = "Elastic-Api-Version" + const warningHeaderKey = "Warning" + + warning := resp.Header.Get(warningHeaderKey) + if warning != "" { + c.log.With("http.request.id", reqID).Warnf("warning in fleet response: %q", warning) + } + + if downgradeVersion := resp.Header.Get(elasticApiVersionHeaderKey); resp.StatusCode == http.StatusBadRequest && downgradeVersion != "" { + // fleet server requested a downgrade to a different api version, we should bubble up an error until some kind + // of fallback mechanism can instantiate the requested version. This is not yet implemented so we log an error + c.log.With("http.request.id", reqID).Errorf("fleet requested a different api version %q but this is currently not implemented: %q", downgradeVersion) + } +} + // URI returns the remote URI. func (c *Client) URI() string { host := c.config.GetHosts()[0]