From 1b21d1dbfafc8017b4d079d1fbd55e70b5cec341 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 22 Nov 2024 21:07:46 +0000 Subject: [PATCH] Surface config parsing error under EA managed mode (#14574) By wrapping reloader errors in a way that EA manager understands (i.e. in a MultiError of UnitError), newRunner initialization errors, including config parsing errors, will be surfaced and logged. --- NOTICE.txt | 62 ++++---- go.mod | 2 +- go.sum | 4 + internal/beatcmd/beat_test.go | 238 ++++++++++++++++++++++++++++++ internal/beatcmd/reloader.go | 28 +++- internal/beatcmd/reloader_test.go | 4 +- 6 files changed, 301 insertions(+), 37 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 283759d9dfc..ebba32f1c5e 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -3075,6 +3075,37 @@ Contents of probable licence file $GOMODCACHE/github.com/jaegertracing/jaeger@v1 limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/joeshaw/multierror +Version: v0.0.0-20140124173710-69b34d4ec901 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/joeshaw/multierror@v0.0.0-20140124173710-69b34d4ec901/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2014 Joe Shaw + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/libp2p/go-reuseport Version: v0.4.0 @@ -12821,37 +12852,6 @@ Contents of probable licence file $GOMODCACHE/github.com/jcmturner/rpc/v2@v2.0.3 limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/joeshaw/multierror -Version: v0.0.0-20140124173710-69b34d4ec901 -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/joeshaw/multierror@v0.0.0-20140124173710-69b34d4ec901/LICENSE: - -The MIT License (MIT) - -Copyright (c) 2014 Joe Shaw - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. - - -------------------------------------------------------------------------------- Dependency : github.com/json-iterator/go Version: v1.1.12 diff --git a/go.mod b/go.mod index edcc3b38bbc..7870d1ae007 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/hashicorp/golang-lru v1.0.2 github.com/jaegertracing/jaeger v1.62.0 + github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 github.com/libp2p/go-reuseport v0.4.0 github.com/modern-go/reflect2 v1.0.2 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.111.0 @@ -111,7 +112,6 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/go.sum b/go.sum index 567c8644a8b..46f8e03b199 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,8 @@ github.com/elastic/go-windows v1.0.2 h1:yoLLsAsV5cfg9FLhZ9EXZ2n2sQFKeDYrHenkcivY github.com/elastic/go-windows v1.0.2/go.mod h1:bGcDpBzXgYSqM0Gx3DM4+UxFj300SZLixie9u9ixLM8= github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo= github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E= github.com/elastic/opentelemetry-lib v0.12.0 h1:SiCGkT7gLOzkl0wQosQrIWjHWb5eJY18Cm3V3GmdU0o= github.com/elastic/opentelemetry-lib v0.12.0/go.mod h1:fdpkzh517xJqSGq3bo/fkdoX/Ag0OoanJoMoIDC3bBk= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= @@ -300,6 +302,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0= github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA= diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index 7664315f8bf..c105beee999 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -19,19 +19,28 @@ package beatcmd import ( "context" + "errors" "fmt" "os" "strconv" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/elastic/apm-server/internal/version" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/tests/integration" + xpacklbmanagement "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -190,6 +199,235 @@ func TestRunManager(t *testing.T) { expectEvent(t, manager.stopped, "manager should have been stopped") } +func TestRunManager_Reloader(t *testing.T) { + // This test asserts that unit changes are reloaded correctly. + + finish := make(chan struct{}) + runCount := atomic.Int64{} + stopCount := atomic.Int64{} + + registry := reload.NewRegistry() + + reloader, err := NewReloader(beat.Info{}, registry, func(p RunnerParams) (Runner, error) { + return runnerFunc(func(ctx context.Context) error { + revision, err := p.Config.Int("revision", -1) + require.NoError(t, err) + if revision == 2 { + close(finish) + } + runCount.Add(1) + <-ctx.Done() + stopCount.Add(1) + return nil + }), nil + }) + require.NoError(t, err) + + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.Version, + Snapshot: true, + } + srv := integration.NewMockServer([]*proto.CheckinExpected{ + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "elastic-apm", + Type: "apm", + Name: "Elastic APM", + Streams: []*proto.Stream{ + { + Id: "elastic-apm", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "revision": 1, + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "elastic-apm", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 2, + Config: &proto.UnitExpectedConfig{ + Id: "elastic-apm", + Type: "apm", + Name: "Elastic APM", + Streams: []*proto.Stream{ + { + Id: "elastic-apm", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "revision": 2, + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + }, + nil, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2( + fmt.Sprintf(":%d", srv.Port), + "", + client.VersionInfo{}, + client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + manager, err := xpacklbmanagement.NewV2AgentManagerWithClient(&xpacklbmanagement.Config{ + Enabled: true, + }, registry, client) + require.NoError(t, err) + + err = manager.Start() + require.NoError(t, err) + defer manager.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finish + cancel() + }() + err = reloader.Run(ctx) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + // TODO(carsonip): There's a bug in EA manager causing an extra reload even if apm tracing config did not change + // see https://github.com/elastic/apm-server/issues/14580. + //return runCount.Load() == 2 && stopCount.Load() == 2 + return runCount.Load() == 3 && stopCount.Load() == 3 + }, 2*time.Second, 50*time.Millisecond) +} + +func TestRunManager_Reloader_newRunnerError(t *testing.T) { + // This test asserts that any errors when creating runner inside reloader, e.g. config parsing error, + // will cause the unit to fail. + + inputFailedMsg := make(chan string) + + registry := reload.NewRegistry() + + _, err := NewReloader(beat.Info{}, registry, func(_ RunnerParams) (Runner, error) { + return nil, errors.New("newRunner error") + }) + require.NoError(t, err) + + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + for _, unit := range observed.GetUnits() { + if unit.GetId() == "input-unit-1" && unit.GetState() == proto.State_FAILED { + inputFailedMsg <- unit.GetMessage() + } + } + } + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.Version, + Snapshot: true, + } + srv := integration.NewMockServer([]*proto.CheckinExpected{ + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "elastic-apm", + Type: "apm", + Name: "Elastic APM", + Streams: []*proto.Stream{ + { + Id: "elastic-apm", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "revision": 1, + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2( + fmt.Sprintf(":%d", srv.Port), + "", + client.VersionInfo{}, + client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + manager, err := xpacklbmanagement.NewV2AgentManagerWithClient(&xpacklbmanagement.Config{ + Enabled: true, + }, registry, client) + require.NoError(t, err) + + err = manager.Start() + require.NoError(t, err) + defer manager.Stop() + + assert.Equal(t, "failed to load input config: newRunner error", <-inputFailedMsg) +} + func runBeat(t testing.TB, beat *Beat) (stop func() error) { ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) diff --git a/internal/beatcmd/reloader.go b/internal/beatcmd/reloader.go index e9244b9af4a..27da683e3dc 100644 --- a/internal/beatcmd/reloader.go +++ b/internal/beatcmd/reloader.go @@ -23,9 +23,11 @@ import ( "fmt" "sync" + "github.com/joeshaw/multierror" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -109,11 +111,21 @@ func (r *Reloader) Run(ctx context.Context) error { } // reloadInput (re)loads input configuration. +// It returns a *multierror.MultiError as libbeat manager error handling is tightly coupled +// with its own reloadable list implementation in libbeat/cfgfile/list.go. // // Note: reloadInputs may be called before the Reloader is running. func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error { if n := len(configs); n != 1 { - return fmt.Errorf("only 1 input supported, got %d", n) + var errs multierror.Errors + for _, cfg := range configs { + unitErr := cfgfile.UnitError{ + Err: fmt.Errorf("only 1 input supported, got %d", n), + UnitID: cfg.InputUnitID, + } + errs = append(errs, unitErr) + } + return errs.Err() } r.mu.Lock() defer r.mu.Unlock() @@ -123,11 +135,21 @@ func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error { // increasing revision number. revision, err := cfg.Int("revision", -1) if err != nil { - return fmt.Errorf("failed to extract input config revision: %w", err) + return multierror.Errors{ + cfgfile.UnitError{ + Err: fmt.Errorf("failed to extract input config revision: %w", err), + UnitID: configs[0].InputUnitID, + }, + }.Err() } if err := r.reload(cfg, r.outputConfig, r.apmTracingConfig); err != nil { - return fmt.Errorf("failed to load input config: %w", err) + return multierror.Errors{ + cfgfile.UnitError{ + Err: fmt.Errorf("failed to load input config: %w", err), + UnitID: configs[0].InputUnitID, + }, + }.Err() } r.inputConfig = cfg r.logger.With(logp.Int64("revision", revision)).Info("loaded input config") diff --git a/internal/beatcmd/reloader_test.go b/internal/beatcmd/reloader_test.go index 9e2bd03e837..4f90d0d5f7f 100644 --- a/internal/beatcmd/reloader_test.go +++ b/internal/beatcmd/reloader_test.go @@ -87,7 +87,7 @@ func TestReloader(t *testing.T) { err = registry.GetInputList().Reload([]*reload.ConfigWithMeta{{ Config: config.MustNewConfigFrom(`{}`), }}) - assert.EqualError(t, err, "failed to extract input config revision: missing field accessing 'revision'") + assert.EqualError(t, err, "1 error: failed to extract input config revision: missing field accessing 'revision'") assertNoReload() err = registry.GetInputList().Reload([]*reload.ConfigWithMeta{{ @@ -119,7 +119,7 @@ func TestReloader(t *testing.T) { err = registry.GetInputList().Reload([]*reload.ConfigWithMeta{{ Config: config.MustNewConfigFrom(`{"revision": 2, "error": true}`), }}) - assert.EqualError(t, err, "failed to load input config: no runner for you") + assert.EqualError(t, err, "1 error: failed to load input config: no runner for you") assertNoReload() // error occurred during reload, nothing changes expectNoEvent(t, r1.stopped, "runner should not have been stopped")