From a00e973c17d02daef4bfcd7c39a1b76fdc084fa8 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Thu, 7 Sep 2023 17:47:04 +0200 Subject: [PATCH 1/8] Initial implementation of elastic api version in roundtrip and fleet client --- .mockery.yaml | 6 +- internal/pkg/fleetapi/client/client.go | 6 ++ internal/pkg/fleetapi/client/client_test.go | 62 +++++++++++++++++++ .../pkg/fleetapi/client/round_trippers.go | 19 ++++++ internal/pkg/remote/client.go | 17 +++++ 5 files changed, 105 insertions(+), 5 deletions(-) diff --git a/.mockery.yaml b/.mockery.yaml index dcb3ae6bb9b..2cf0461278e 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -2,8 +2,4 @@ inpackage: False testonly: False with-expecter: True keeptree: True -case: underscore -note: | - Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - // or more contributor license agreements. Licensed under the Elastic License; - // you may not use this file except in compliance with the Elastic License. +case: underscore \ No newline at end of file diff --git a/internal/pkg/fleetapi/client/client.go b/internal/pkg/fleetapi/client/client.go index 0f478497bb6..b6c9f7584ef 100644 --- a/internal/pkg/fleetapi/client/client.go +++ b/internal/pkg/fleetapi/client/client.go @@ -34,8 +34,14 @@ type Sender interface { URI() string } +// Default value for Elastic-Api-Version header when sending requests to Fleet (that's the only version we have at the time of writing) +const defaultFleetApiVersion = "2023-06-01" + var baseRoundTrippers = func(rt http.RoundTripper) (http.RoundTripper, error) { rt = NewFleetUserAgentRoundTripper(rt, release.Version()) + + rt = NewElasticApiVersionRoundTripper(rt, defaultFleetApiVersion) + return rt, nil } diff --git a/internal/pkg/fleetapi/client/client_test.go b/internal/pkg/fleetapi/client/client_test.go index f8d068663dd..88ca8c73575 100644 --- a/internal/pkg/fleetapi/client/client_test.go +++ b/internal/pkg/fleetapi/client/client_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/remote" + "github.com/elastic/elastic-agent/pkg/core/logger" ) func TestHTTPClient(t *testing.T) { @@ -147,3 +149,63 @@ func TestExtract(t *testing.T) { assert.True(t, strings.Index(err.Error(), "fails because") > 0) }) } + +func TestElasticApiVersion(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + writer.WriteHeader(http.StatusOK) + }) + + mux.HandleFunc("/downgrade", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // request to downgrade to a completely fictitious version (just testing that we get a log for that) + writer.Header().Add(elasticApiVersionHeaderKey, request.URL.Query().Get("version")) + writer.WriteHeader(http.StatusBadRequest) + }) + + mux.HandleFunc("/warning", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // send back a warning simulating an unsupported api version + writer.Header().Add("Warning", request.URL.Query().Get("warning_msg")) + writer.WriteHeader(http.StatusBadRequest) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + + t.Run("verify that Elastic-Api-Version header is present", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err = clt.Send(ctx, http.MethodGet, "/", nil, nil, nil) + assert.NoError(t, err) + }) + + t.Run("verify that we log a downgrade request", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err = clt.Send(ctx, http.MethodGet, "/downgrade", map[string][]string{"version": {"2020-01-01"}}, nil, nil) + assert.NoError(t, err) + logs := obsLogs.FilterMessageSnippet("fleet requested a different api version \"2020-01-01\"").All() + t.Logf("retrieved logs: %v", logs) + assert.NotEmptyf(t, logs, "downgrade response was not logged") + }) + + t.Run("verify that we log an incoming warning", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + warningText := "API version is no longer supported. Upgrade immediately!" + _, err = clt.Send(ctx, http.MethodGet, "/warning", map[string][]string{"warning_msg": {warningText}}, nil, nil) + assert.NoError(t, err) + logs := obsLogs.FilterMessageSnippet("API version is no longer supported. Upgrade immediately!").All() + t.Logf("retrieved logs: %v", logs) + assert.NotEmptyf(t, logs, "warning was not logged") + }) +} diff --git a/internal/pkg/fleetapi/client/round_trippers.go b/internal/pkg/fleetapi/client/round_trippers.go index b5ce1bea3f2..57835d31f61 100644 --- a/internal/pkg/fleetapi/client/round_trippers.go +++ b/internal/pkg/fleetapi/client/round_trippers.go @@ -69,3 +69,22 @@ func NewFleetAuthRoundTripper( } return &FleetAuthRoundTripper{rt: wrapped, apiKey: apiKey}, nil } + +// ElasticApiVersionRoundTripper adds an Elastic-Api-Version header on every request. +type ElasticApiVersionRoundTripper struct { + rt http.RoundTripper + elasticApiVersion string +} + +const elasticApiVersionHeaderKey = "Elastic-Api-Version" + +// RoundTrip adds an Elastic-Api-Version header on every request. +func (r *ElasticApiVersionRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set(elasticApiVersionHeaderKey, r.elasticApiVersion) + + return r.rt.RoundTrip(req) +} + +func NewElasticApiVersionRoundTripper(inner http.RoundTripper, elasticApiVersion string) http.RoundTripper { + return &ElasticApiVersionRoundTripper{elasticApiVersion: elasticApiVersion, rt: inner} +} diff --git a/internal/pkg/remote/client.go b/internal/pkg/remote/client.go index cc3017dc17f..078016d1d87 100644 --- a/internal/pkg/remote/client.go +++ b/internal/pkg/remote/client.go @@ -220,6 +220,7 @@ func (c *Client) Send( c.log.With("error", err).Debugf(msg) continue } + c.checkApiVersionHeaders(reqID, resp) return resp, nil } @@ -227,6 +228,22 @@ func (c *Client) Send( return nil, fmt.Errorf("all hosts failed: %w", multiErr) } +func (c *Client) checkApiVersionHeaders(reqID string, resp *http.Response) { + const elasticApiVersionHeaderKey = "Elastic-Api-Version" + const warningHeaderKey = "Warning" + + warning := resp.Header.Get(warningHeaderKey) + if warning != "" { + c.log.With("http.request.id", reqID).Warnf("warning in fleet response: %q", warning) + } + + if downgradeVersion := resp.Header.Get(elasticApiVersionHeaderKey); resp.StatusCode == http.StatusBadRequest && downgradeVersion != "" { + // fleet server requested a downgrade to a different api version, we should bubble up an error until some kind + // of fallback mechanism can instantiate the requested version. This is not yet implemented so we log an error + c.log.With("http.request.id", reqID).Errorf("fleet requested a different api version %q but this is currently not implemented: %q", downgradeVersion) + } +} + // URI returns the remote URI. func (c *Client) URI() string { host := c.config.GetHosts()[0] From 0445cd10410509e8f98cca2d4c4d07866dcaed96 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Fri, 8 Sep 2023 17:52:42 +0200 Subject: [PATCH 2/8] lint --- internal/pkg/fleetapi/client/client_test.go | 90 ++++++++++++++------- 1 file changed, 59 insertions(+), 31 deletions(-) diff --git a/internal/pkg/fleetapi/client/client_test.go b/internal/pkg/fleetapi/client/client_test.go index 88ca8c73575..c68c2b5208f 100644 --- a/internal/pkg/fleetapi/client/client_test.go +++ b/internal/pkg/fleetapi/client/client_test.go @@ -151,47 +151,55 @@ func TestExtract(t *testing.T) { } func TestElasticApiVersion(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { - assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) - writer.WriteHeader(http.StatusOK) - }) - - mux.HandleFunc("/downgrade", func(writer http.ResponseWriter, request *http.Request) { - assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) - // request to downgrade to a completely fictitious version (just testing that we get a log for that) - writer.Header().Add(elasticApiVersionHeaderKey, request.URL.Query().Get("version")) - writer.WriteHeader(http.StatusBadRequest) - }) + t.Run("verify that Elastic-Api-Version header is present", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - mux.HandleFunc("/warning", func(writer http.ResponseWriter, request *http.Request) { - assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) - // send back a warning simulating an unsupported api version - writer.Header().Add("Warning", request.URL.Query().Get("warning_msg")) - writer.WriteHeader(http.StatusBadRequest) - }) + mux := http.NewServeMux() + mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + writer.WriteHeader(http.StatusOK) + }) - ts := httptest.NewServer(mux) - defer ts.Close() + ts := httptest.NewServer(mux) + defer ts.Close() - testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + testLogger, _ := logger.NewTesting("testElasticApiVersion") - clt, err := NewWithConfig(testLogger, remote.Config{ - Hosts: []string{ts.URL}, - }) - require.NoError(t, err) + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) - t.Run("verify that Elastic-Api-Version header is present", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - _, err = clt.Send(ctx, http.MethodGet, "/", nil, nil, nil) + resp, err := clt.Send(ctx, http.MethodGet, "/", nil, nil, nil) + defer resp.Body.Close() assert.NoError(t, err) }) t.Run("verify that we log a downgrade request", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err = clt.Send(ctx, http.MethodGet, "/downgrade", map[string][]string{"version": {"2020-01-01"}}, nil, nil) + + mux := http.NewServeMux() + mux.HandleFunc("/downgrade", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // request to downgrade to a completely fictitious version (just testing that we get a log for that) + writer.Header().Add(elasticApiVersionHeaderKey, request.URL.Query().Get("version")) + writer.WriteHeader(http.StatusBadRequest) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + + resp, err := clt.Send(ctx, http.MethodGet, "/downgrade", map[string][]string{"version": {"2020-01-01"}}, nil, nil) + defer resp.Body.Close() assert.NoError(t, err) logs := obsLogs.FilterMessageSnippet("fleet requested a different api version \"2020-01-01\"").All() t.Logf("retrieved logs: %v", logs) @@ -201,8 +209,28 @@ func TestElasticApiVersion(t *testing.T) { t.Run("verify that we log an incoming warning", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + mux := http.NewServeMux() + mux.HandleFunc("/warning", func(writer http.ResponseWriter, request *http.Request) { + assert.Equal(t, request.Header.Get(elasticApiVersionHeaderKey), defaultFleetApiVersion) + // send back a warning simulating an unsupported api version + writer.Header().Add("Warning", request.URL.Query().Get("warning_msg")) + writer.WriteHeader(http.StatusBadRequest) + }) + + ts := httptest.NewServer(mux) + defer ts.Close() + + testLogger, obsLogs := logger.NewTesting("testElasticApiVersion") + + clt, err := NewWithConfig(testLogger, remote.Config{ + Hosts: []string{ts.URL}, + }) + require.NoError(t, err) + warningText := "API version is no longer supported. Upgrade immediately!" - _, err = clt.Send(ctx, http.MethodGet, "/warning", map[string][]string{"warning_msg": {warningText}}, nil, nil) + resp, err := clt.Send(ctx, http.MethodGet, "/warning", map[string][]string{"warning_msg": {warningText}}, nil, nil) + defer resp.Body.Close() assert.NoError(t, err) logs := obsLogs.FilterMessageSnippet("API version is no longer supported. Upgrade immediately!").All() t.Logf("retrieved logs: %v", logs) From bde5dd8d26e795678574d65f69f66f9153feca38 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Sep 2023 11:07:37 +0200 Subject: [PATCH 3/8] Move FleetGateway interface --- .../application/coordinator/coordinator.go | 4 +++ .../{gateway => coordinator}/gateway.go | 11 ++++++- .../gateway/fleet/fleet_gateway.go | 32 +++++++++++-------- .../gateway/fleet/fleet_gateway_test.go | 13 ++++---- .../pkg/agent/application/managed_mode.go | 3 +- 5 files changed, 39 insertions(+), 24 deletions(-) rename internal/pkg/agent/application/{gateway => coordinator}/gateway.go (84%) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index f45ba99ba86..21d1fde4f86 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -855,8 +855,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case configErr := <-c.managerChans.configManagerError: if c.isManaged { + var wErr *WarningError if configErr == nil { c.setFleetState(agentclient.Healthy, "Connected") + } else if errors.As(configErr, &wErr) { + // we received a warning from Fleet, set state to degraded and the warning as state string + c.setFleetState(agentclient.Degraded, wErr.Warning) } else { c.setFleetState(agentclient.Failed, configErr.Error()) } diff --git a/internal/pkg/agent/application/gateway/gateway.go b/internal/pkg/agent/application/coordinator/gateway.go similarity index 84% rename from internal/pkg/agent/application/gateway/gateway.go rename to internal/pkg/agent/application/coordinator/gateway.go index 6946c8671a4..a0071daf4b0 100644 --- a/internal/pkg/agent/application/gateway/gateway.go +++ b/internal/pkg/agent/application/coordinator/gateway.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package gateway +package coordinator import ( "context" @@ -28,3 +28,12 @@ type FleetGateway interface { // SetClient sets the client for the gateway. SetClient(client.Sender) } + +// WarningError is emitted when we receive a warning in the Fleet response +type WarningError struct { + Warning string +} + +func (w WarningError) Error() string { + return w.Warning +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 363df4bdbe5..7a8bffebe78 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -12,7 +12,6 @@ import ( eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/core/backoff" @@ -66,7 +65,7 @@ type stateStore interface { Actions() []fleetapi.Action } -type fleetGateway struct { +type FleetGateway struct { log *logger.Logger client client.Sender scheduler scheduler.Scheduler @@ -89,7 +88,7 @@ func New( acker acker.Acker, stateFetcher func() coordinator.State, stateStore stateStore, -) (gateway.FleetGateway, error) { +) (*FleetGateway, error) { scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( @@ -113,8 +112,8 @@ func newFleetGatewayWithScheduler( acker acker.Acker, stateFetcher func() coordinator.State, stateStore stateStore, -) (gateway.FleetGateway, error) { - return &fleetGateway{ +) (*FleetGateway, error) { + return &FleetGateway{ log: log, client: client, settings: settings, @@ -128,11 +127,11 @@ func newFleetGatewayWithScheduler( }, nil } -func (f *fleetGateway) Actions() <-chan []fleetapi.Action { +func (f *FleetGateway) Actions() <-chan []fleetapi.Action { return f.actionCh } -func (f *fleetGateway) Run(ctx context.Context) error { +func (f *FleetGateway) Run(ctx context.Context) error { // Backoff implementation doesn't support the use of a context [cancellation] as the shutdown mechanism. // So we keep a done channel that will be closed when the current context is shutdown. done := make(chan struct{}) @@ -174,11 +173,11 @@ func (f *fleetGateway) Run(ctx context.Context) error { } // Errors returns the channel to watch for reported errors. -func (f *fleetGateway) Errors() <-chan error { +func (f *FleetGateway) Errors() <-chan error { return f.errCh } -func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) { +func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) { bo.Reset() // Guard if the context is stopped by a out of bound call, @@ -222,7 +221,12 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee } f.checkinFailCounter = 0 - f.errCh <- nil + if resp.FleetWarning != "" { + f.errCh <- &coordinator.WarningError{Warning: resp.FleetWarning} + } else { + f.errCh <- nil + } + // Request was successful, return the collected actions. return resp, nil } @@ -232,7 +236,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee return nil, ctx.Err() } -func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent { +func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent { if components == nil { return nil } @@ -307,7 +311,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component return checkinComponents } -func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) { +func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) { ecsMeta, err := info.Metadata(ctx, f.log) if err != nil { f.log.Error(errors.New("failed to load metadata", err)) @@ -367,7 +371,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, } // shouldUnenroll checks if the max number of trying an invalid key is reached -func (f *fleetGateway) shouldUnenroll() bool { +func (f *FleetGateway) shouldUnenroll() bool { return f.unauthCounter > maxUnauthCounter } @@ -375,7 +379,7 @@ func isUnauth(err error) bool { return errors.Is(err, client.ErrInvalidAPIKey) } -func (f *fleetGateway) SetClient(c client.Sender) { +func (f *FleetGateway) SetClient(c client.Sender) { f.client = c } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 51fda3581d4..04572eab845 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -23,7 +23,6 @@ import ( "gotest.tools/assert" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" @@ -70,7 +69,7 @@ func newTestingClient() *testingClient { return &testingClient{received: make(chan struct{}, 1)} } -type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *scheduler.Stepper) +type withGatewayFunc func(*testing.T, coordinator.FleetGateway, *testingClient, *scheduler.Stepper) func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) { return func(t *testing.T) { @@ -128,7 +127,7 @@ func TestFleetGateway(t *testing.T) { t.Run("send no event and receive no action", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -160,7 +159,7 @@ func TestFleetGateway(t *testing.T) { t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -321,7 +320,7 @@ func TestRetriesOnFailures(t *testing.T) { t.Run("When the gateway fails to communicate with the checkin API we will retry", withGateway(agentInfo, settings, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -369,7 +368,7 @@ func TestRetriesOnFailures(t *testing.T) { Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute}, }, func( t *testing.T, - gateway gateway.FleetGateway, + gateway coordinator.FleetGateway, client *testingClient, scheduler *scheduler.Stepper, ) { @@ -404,7 +403,7 @@ func emptyStateFetcher() coordinator.State { return coordinator.State{} } -func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error { +func runFleetGateway(ctx context.Context, g coordinator.FleetGateway) <-chan error { done := make(chan bool) errCh := make(chan error, 1) go func() { diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 6895003c38e..f29be7fe258 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway" fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" @@ -223,7 +222,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error { } // runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval. -func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) { +func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) { t := time.NewTimer(flushInterval) for { select { From 2f06a5c68933562fc9859e27d93310ae6aea5a8e Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Fri, 8 Sep 2023 18:16:53 +0200 Subject: [PATCH 4/8] WIP - signal Fleet warning on elastic-agent status --- internal/pkg/fleetapi/checkin_cmd.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 7150dade0f7..b52eeb3903f 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -77,8 +77,9 @@ func (e *CheckinRequest) Validate() error { // CheckinResponse is the response send back from the server which contains all the action that // need to be executed or proxy to running processes. type CheckinResponse struct { - AckToken string `json:"ack_token"` - Actions Actions `json:"actions"` + AckToken string `json:"ack_token"` + Actions Actions `json:"actions"` + FleetWarning string `json:"-"` } // Validate validates the response send from the server. @@ -140,6 +141,7 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe } checkinResponse := &CheckinResponse{} + checkinResponse.FleetWarning = resp.Header.Get("Warning") decoder := json.NewDecoder(bytes.NewReader(rs)) if err := decoder.Decode(checkinResponse); err != nil { return nil, sendDuration, errors.New(err, From bbee85cdf393cffdb410018be9693d5610975337 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Sep 2023 12:21:24 +0200 Subject: [PATCH 5/8] fixup! WIP - signal Fleet warning on elastic-agent status --- .../agent/application/coordinator/coordinator.go | 2 +- .../application/coordinator/coordinator_test.go | 14 ++++++++++++++ .../pkg/agent/application/coordinator/gateway.go | 8 ++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 21d1fde4f86..e6a2dbc182f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -860,7 +860,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { c.setFleetState(agentclient.Healthy, "Connected") } else if errors.As(configErr, &wErr) { // we received a warning from Fleet, set state to degraded and the warning as state string - c.setFleetState(agentclient.Degraded, wErr.Warning) + c.setFleetState(agentclient.Degraded, wErr.Error()) } else { c.setFleetState(agentclient.Failed, configErr.Error()) } diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 53d4524a1cb..af51ed63523 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -222,6 +222,20 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) { return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected" }, 3*time.Second, 10*time.Millisecond) + // report a warning + cfgMgr.ReportError(ctx, NewWarningError("some msg from Fleet")) + assert.Eventually(t, func() bool { + state := coord.State() + return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Degraded && state.FleetMessage == "some msg from Fleet" + }, 3*time.Second, 10*time.Millisecond) + + // recover from warning error + cfgMgr.ReportError(ctx, nil) + assert.Eventually(t, func() bool { + state := coord.State() + return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected" + }, 3*time.Second, 10*time.Millisecond) + cancel() err = <-coordCh require.NoError(t, err) diff --git a/internal/pkg/agent/application/coordinator/gateway.go b/internal/pkg/agent/application/coordinator/gateway.go index a0071daf4b0..f32dd439b8d 100644 --- a/internal/pkg/agent/application/coordinator/gateway.go +++ b/internal/pkg/agent/application/coordinator/gateway.go @@ -31,9 +31,13 @@ type FleetGateway interface { // WarningError is emitted when we receive a warning in the Fleet response type WarningError struct { - Warning string + msg string } func (w WarningError) Error() string { - return w.Warning + return w.msg +} + +func NewWarningError(warningMsg string) *WarningError { + return &WarningError{msg: warningMsg} } From 535a40fef3a782f4a813c12c176dff23ace2b841 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Sep 2023 14:57:35 +0200 Subject: [PATCH 6/8] fixup! fixup! WIP - signal Fleet warning on elastic-agent status --- internal/pkg/agent/application/gateway/fleet/fleet_gateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 7a8bffebe78..31c81955a10 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -222,7 +222,7 @@ func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee f.checkinFailCounter = 0 if resp.FleetWarning != "" { - f.errCh <- &coordinator.WarningError{Warning: resp.FleetWarning} + f.errCh <- coordinator.NewWarningError(resp.FleetWarning) } else { f.errCh <- nil } From e786604fbb2c2b033597f461f13635a1b7d2b3dc Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Sep 2023 16:15:47 +0200 Subject: [PATCH 7/8] fixup! fixup! fixup! WIP - signal Fleet warning on elastic-agent status --- internal/pkg/fleetapi/client/client_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/pkg/fleetapi/client/client_test.go b/internal/pkg/fleetapi/client/client_test.go index c68c2b5208f..c0b3fed0ba0 100644 --- a/internal/pkg/fleetapi/client/client_test.go +++ b/internal/pkg/fleetapi/client/client_test.go @@ -172,8 +172,9 @@ func TestElasticApiVersion(t *testing.T) { require.NoError(t, err) resp, err := clt.Send(ctx, http.MethodGet, "/", nil, nil, nil) - defer resp.Body.Close() - assert.NoError(t, err) + if assert.NoError(t, err) { + defer resp.Body.Close() + } }) t.Run("verify that we log a downgrade request", func(t *testing.T) { @@ -199,8 +200,9 @@ func TestElasticApiVersion(t *testing.T) { require.NoError(t, err) resp, err := clt.Send(ctx, http.MethodGet, "/downgrade", map[string][]string{"version": {"2020-01-01"}}, nil, nil) - defer resp.Body.Close() - assert.NoError(t, err) + if assert.NoError(t, err) { + defer resp.Body.Close() + } logs := obsLogs.FilterMessageSnippet("fleet requested a different api version \"2020-01-01\"").All() t.Logf("retrieved logs: %v", logs) assert.NotEmptyf(t, logs, "downgrade response was not logged") @@ -230,8 +232,9 @@ func TestElasticApiVersion(t *testing.T) { warningText := "API version is no longer supported. Upgrade immediately!" resp, err := clt.Send(ctx, http.MethodGet, "/warning", map[string][]string{"warning_msg": {warningText}}, nil, nil) - defer resp.Body.Close() - assert.NoError(t, err) + if assert.NoError(t, err) { + defer resp.Body.Close() + } logs := obsLogs.FilterMessageSnippet("API version is no longer supported. Upgrade immediately!").All() t.Logf("retrieved logs: %v", logs) assert.NotEmptyf(t, logs, "warning was not logged") From fb2d157a9d6030b7b62b90caaaad98f5d9aad13f Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Sep 2023 16:35:38 +0200 Subject: [PATCH 8/8] fixup! fixup! fixup! fixup! WIP - signal Fleet warning on elastic-agent status --- internal/pkg/fleetapi/client/client_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/pkg/fleetapi/client/client_test.go b/internal/pkg/fleetapi/client/client_test.go index c0b3fed0ba0..cc5d2501aa8 100644 --- a/internal/pkg/fleetapi/client/client_test.go +++ b/internal/pkg/fleetapi/client/client_test.go @@ -73,7 +73,10 @@ func TestHTTPClient(t *testing.T) { }) require.NoError(t, err) - _, err = client.Send(ctx, "GET", "/echo-hello", nil, nil, nil) + resp, err := client.Send(ctx, "GET", "/echo-hello", nil, nil, nil) + if err == nil { + defer resp.Body.Close() + } require.Error(t, err) }, )) @@ -120,7 +123,10 @@ func TestHTTPClient(t *testing.T) { }) require.NoError(t, err) - _, err = client.Send(timeoutCtx, "GET", "/echo-hello", nil, nil, nil) + resp, err := client.Send(timeoutCtx, "GET", "/echo-hello", nil, nil, nil) + if err == nil { + defer resp.Body.Close() + } require.Error(t, err) }) }