From 0a79947e9b1fa1f0f2267db37807c346f7161443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 1 Jun 2022 17:20:35 +0200 Subject: [PATCH] fixes after moving things around --- pkg/feature/feature.go | 4 +- pkg/feature/registry_test.go | 47 +++++++---- pkg/manager/input/error.go | 2 +- pkg/manager/input/input-cursor/clean.go | 2 +- pkg/manager/input/input-cursor/clean_test.go | 12 +-- pkg/manager/input/input-cursor/cursor_test.go | 16 ++-- pkg/manager/input/input-cursor/input.go | 21 +++-- pkg/manager/input/input-cursor/manager.go | 16 ++-- .../input/input-cursor/manager_test.go | 84 +++++++++++-------- pkg/manager/input/input-cursor/publish.go | 20 ++--- .../input/input-cursor/publish_test.go | 38 +++++---- pkg/manager/input/input-cursor/store.go | 10 +-- pkg/manager/input/input-cursor/store_test.go | 32 ++++--- .../input/input-stateless/stateless.go | 26 +++--- .../input/input-stateless/stateless_test.go | 55 ++++++------ pkg/manager/input/input.go | 35 ++++++-- .../input/internal/inputest/inputest.go | 38 ++++----- pkg/manager/input/internal/inputest/loader.go | 10 +-- pkg/manager/input/loader.go | 4 +- pkg/manager/input/loader_test.go | 4 +- pkg/manager/input/mode_string.go | 2 +- pkg/manager/input/plugin.go | 4 +- pkg/manager/input/plugin_test.go | 4 +- pkg/manager/input/simplemanager.go | 2 +- pkg/manager/input/util_test.go | 8 +- pkg/manager/internal/resources/goroutines.go | 5 +- pkg/publisher/acker/acker.go | 50 +++++------ pkg/publisher/acker/acker_test.go | 80 +++++++++--------- pkg/publisher/event.go | 4 + pkg/publisher/pipeline.go | 2 +- pkg/publisher/testing/connector.go | 48 +++++------ pkg/publisher/testing/testing.go | 33 ++++---- pkg/publisher/testing/testing_test.go | 8 +- pkg/statestore/backend/memlog/diskstore.go | 2 +- pkg/statestore/backend/memlog/doc.go | 2 +- pkg/statestore/backend/memlog/memlog.go | 2 +- pkg/statestore/backend/memlog/memlog_test.go | 18 ++-- pkg/statestore/backend/memlog/store.go | 4 +- pkg/statestore/backend/memlog/util.go | 5 +- pkg/statestore/backend/memlog/util_test.go | 3 +- pkg/statestore/cleanup/cleanup.go | 4 +- pkg/statestore/cleanup/cleanup_test.go | 2 +- pkg/statestore/error.go | 7 +- .../internal/storecompliance/reg.go | 16 ++-- .../storecompliance/storecompliance.go | 2 +- .../internal/storecompliance/util.go | 4 +- pkg/statestore/mock_test.go | 10 ++- pkg/statestore/registry.go | 6 +- pkg/statestore/store.go | 2 +- pkg/statestore/store_test.go | 2 +- pkg/statestore/storetest/storetest.go | 6 +- pkg/statestore/storetest/storetest_test.go | 9 +- 52 files changed, 445 insertions(+), 387 deletions(-) diff --git a/pkg/feature/feature.go b/pkg/feature/feature.go index ec82fa2..d3fd89d 100644 --- a/pkg/feature/feature.go +++ b/pkg/feature/feature.go @@ -42,7 +42,7 @@ type Featurable interface { // of the method is type checked by the 'FindFactory' of each namespace. Factory() interface{} - // Description return the avaiable information for a specific feature. + // Description return the available information for a specific feature. Description() Details String() string @@ -71,7 +71,7 @@ func (f *Feature) Factory() interface{} { return f.factory } -// Description return the avaiable information for a specific feature. +// Description return the available information for a specific feature. func (f *Feature) Description() Details { return f.description } diff --git a/pkg/feature/registry_test.go b/pkg/feature/registry_test.go index 7882e44..4b51300 100644 --- a/pkg/feature/registry_test.go +++ b/pkg/feature/registry_test.go @@ -48,7 +48,7 @@ func TestRegister(t *testing.T) { t.Run("namespace exists and feature doesn't exist", func(t *testing.T) { r := NewRegistry() - r.Register(New("processor", "bar", f, defaultDetails)) + mustRegister(r, New("processor", "bar", f, defaultDetails)) err := r.Register(New("processor", "foo", f, defaultDetails)) if !assert.NoError(t, err) { return @@ -59,7 +59,7 @@ func TestRegister(t *testing.T) { t.Run("namespace exists and feature exists and not the same factory", func(t *testing.T) { r := NewRegistry() - r.Register(New("processor", "foo", func() {}, defaultDetails)) + mustRegister(r, New("processor", "foo", func() {}, defaultDetails)) err := r.Register(New("processor", "foo", f, defaultDetails)) if !assert.Error(t, err) { return @@ -70,7 +70,7 @@ func TestRegister(t *testing.T) { t.Run("when the exact feature is already registered", func(t *testing.T) { feature := New("processor", "foo", f, defaultDetails) r := NewRegistry() - r.Register(feature) + mustRegister(r, feature) err := r.Register(feature) if !assert.NoError(t, err) { return @@ -79,12 +79,19 @@ func TestRegister(t *testing.T) { }) } +func mustRegister(r *Registry, f Featurable) { + err := r.Register(f) + if err != nil { + panic(err) + } +} + func TestFeature(t *testing.T) { f := func() {} r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) - r.Register(New("HOLA", "fOO", f, defaultDetails)) + mustRegister(r, New("processor", "foo", f, defaultDetails)) + mustRegister(r, New("HOLA", "fOO", f, defaultDetails)) t.Run("when namespace and feature are present", func(t *testing.T) { feature, err := r.Lookup("processor", "foo") @@ -113,9 +120,9 @@ func TestLookup(t *testing.T) { f := func() {} r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) - r.Register(New("processor", "foo2", f, defaultDetails)) - r.Register(New("HELLO", "fOO", f, defaultDetails)) + mustRegister(r, New("processor", "foo", f, defaultDetails)) + mustRegister(r, New("processor", "foo2", f, defaultDetails)) + mustRegister(r, New("HELLO", "fOO", f, defaultDetails)) t.Run("when namespace and feature are present", func(t *testing.T) { features, err := r.LookupAll("processor") @@ -147,9 +154,10 @@ func TestUnregister(t *testing.T) { t.Run("when the namespace and the feature exists", func(t *testing.T) { r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) + err := r.Register(New("processor", "foo", f, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) - err := r.Unregister("processor", "foo") + err = r.Unregister("processor", "foo") if !assert.NoError(t, err) { return } @@ -158,9 +166,10 @@ func TestUnregister(t *testing.T) { t.Run("when the namespace exist and the feature doesn't", func(t *testing.T) { r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) + err := r.Register(New("processor", "foo", f, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) - err := r.Unregister("processor", "bar") + err = r.Unregister("processor", "bar") if assert.Error(t, err) { return } @@ -169,9 +178,10 @@ func TestUnregister(t *testing.T) { t.Run("when the namespace doesn't exists", func(t *testing.T) { r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) + err := r.Register(New("processor", "foo", f, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) - err := r.Unregister("outputs", "bar") + err = r.Unregister("outputs", "bar") if assert.Error(t, err) { return } @@ -184,18 +194,21 @@ func TestOverwrite(t *testing.T) { f := func() {} r := NewRegistry() assert.Equal(t, 0, r.Size()) - r.Overwrite(New("processor", "foo", f, defaultDetails)) + err := r.Overwrite(New("processor", "foo", f, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) }) t.Run("overwrite when the feature exists", func(t *testing.T) { f := func() {} r := NewRegistry() - r.Register(New("processor", "foo", f, defaultDetails)) + err := r.Register(New("processor", "foo", f, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) check := 42 - r.Overwrite(New("processor", "foo", check, defaultDetails)) + err = r.Overwrite(New("processor", "foo", check, defaultDetails)) + assert.NoError(t, err) assert.Equal(t, 1, r.Size()) feature, err := r.Lookup("processor", "foo") diff --git a/pkg/manager/input/error.go b/pkg/manager/input/error.go index eeab5a2..168386a 100644 --- a/pkg/manager/input/error.go +++ b/pkg/manager/input/error.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "errors" diff --git a/pkg/manager/input/input-cursor/clean.go b/pkg/manager/input/input-cursor/clean.go index 92124f4..7113dec 100644 --- a/pkg/manager/input/input-cursor/clean.go +++ b/pkg/manager/input/input-cursor/clean.go @@ -44,7 +44,7 @@ type cleaner struct { // once the last event has been ACKed. func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) { started := time.Now() - timed.Periodic(canceler, interval, func() error { + _ = timed.Periodic(canceler, interval, func() error { gcStore(c.log, started, store) return nil }) diff --git a/pkg/manager/input/input-cursor/clean_test.go b/pkg/manager/input/input-cursor/clean_test.go index fb45860..b8f60bd 100644 --- a/pkg/manager/input/input-cursor/clean_test.go +++ b/pkg/manager/input/input-cursor/clean_test.go @@ -31,7 +31,7 @@ func TestGCStore(t *testing.T) { started := time.Now() backend := createSampleStore(t, nil) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() gcStore(logp.NewLogger("test"), started, store) @@ -52,7 +52,7 @@ func TestGCStore(t *testing.T) { } backend := createSampleStore(t, initState) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() gcStore(logp.NewLogger("test"), started, store) @@ -72,7 +72,7 @@ func TestGCStore(t *testing.T) { } backend := createSampleStore(t, initState) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() gcStore(logp.NewLogger("test"), started, store) @@ -93,7 +93,7 @@ func TestGCStore(t *testing.T) { } backend := createSampleStore(t, initState) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() gcStore(logp.NewLogger("test"), started, store) @@ -113,7 +113,7 @@ func TestGCStore(t *testing.T) { } backend := createSampleStore(t, initState) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() // access resource and check it is not gc'ed @@ -140,7 +140,7 @@ func TestGCStore(t *testing.T) { } backend := createSampleStore(t, initState) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() // create pending update operation diff --git a/pkg/manager/input/input-cursor/cursor_test.go b/pkg/manager/input/input-cursor/cursor_test.go index ed1f412..238bb3a 100644 --- a/pkg/manager/input/input-cursor/cursor_test.go +++ b/pkg/manager/input/input-cursor/cursor_test.go @@ -25,7 +25,7 @@ import ( func TestCursor_IsNew(t *testing.T) { t.Run("true if key is not in store", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() cursor := makeCursor(store, store.Get("test::key")) @@ -33,7 +33,7 @@ func TestCursor_IsNew(t *testing.T) { }) t.Run("true if key is in store but without cursor value", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: nil}, })) defer store.Release() @@ -43,7 +43,7 @@ func TestCursor_IsNew(t *testing.T) { }) t.Run("false if key with cursor value is in persistent store", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: "test"}, })) defer store.Release() @@ -53,7 +53,7 @@ func TestCursor_IsNew(t *testing.T) { }) t.Run("false if key with cursor value is in memory store only", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: nil}, })) defer store.Release() @@ -70,7 +70,7 @@ func TestCursor_IsNew(t *testing.T) { func TestCursor_Unpack(t *testing.T) { t.Run("nothing to unpack if key is new", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() var st string @@ -81,7 +81,7 @@ func TestCursor_Unpack(t *testing.T) { }) t.Run("unpack fails if types are not compatible", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: "test"}, })) defer store.Release() @@ -92,7 +92,7 @@ func TestCursor_Unpack(t *testing.T) { }) t.Run("unpack from state in persistent store", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: "test"}, })) defer store.Release() @@ -105,7 +105,7 @@ func TestCursor_Unpack(t *testing.T) { }) t.Run("unpack from in memory state if updates are pending", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": {Cursor: "test"}, })) defer store.Release() diff --git a/pkg/manager/input/input-cursor/input.go b/pkg/manager/input/input-cursor/input.go index 92879df..0d81626 100644 --- a/pkg/manager/input/input-cursor/input.go +++ b/pkg/manager/input/input-cursor/input.go @@ -28,10 +28,9 @@ import ( "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" - input "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/acker" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + "github.com/elastic/elastic-agent-inputs/pkg/publisher/acker" ) // Input interface for cursor based inputs. This interface must be implemented @@ -102,7 +101,7 @@ func (inp *managedInput) testSource(ctx input.TestContext, source Source) (err e // issue, but not crash the whole process. func (inp *managedInput) Run( ctx input.Context, - pipeline beat.PipelineConnector, + pipeline publisher.PipelineConnector, ) (err error) { // Setup cancellation using a custom cancel context. All workers will be // stopped if one failed badly by returning an error. @@ -136,7 +135,7 @@ func (inp *managedInput) runSource( ctx input.Context, store *store, source Source, - pipeline beat.PipelineConnector, + pipeline publisher.PipelineConnector, ) (err error) { defer func() { if v := recover(); v != nil { @@ -145,9 +144,9 @@ func (inp *managedInput) runSource( } }() - client, err := pipeline.ConnectWith(beat.ClientConfig{ + client, err := pipeline.ConnectWith(publisher.ClientConfig{ CloseRef: ctx.Cancelation, - ACKHandler: newInputACKHandler(ctx.Logger), + ACKHandler: newInputACKHandler(), }) if err != nil { return err @@ -164,8 +163,8 @@ func (inp *managedInput) runSource( store.UpdateTTL(resource, inp.cleanTimeout) cursor := makeCursor(store, resource) - publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} - return inp.input.Run(ctx, source, cursor, publisher) + p := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} + return inp.input.Run(ctx, source, cursor, p) } func (inp *managedInput) createSourceID(s Source) string { @@ -175,7 +174,7 @@ func (inp *managedInput) createSourceID(s Source) string { return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name()) } -func newInputACKHandler(log *logp.Logger) beat.ACKer { +func newInputACKHandler() publisher.ACKer { return acker.EventPrivateReporter(func(acked int, private []interface{}) { var n uint var last int diff --git a/pkg/manager/input/input-cursor/manager.go b/pkg/manager/input/input-cursor/manager.go index bb0b526..04d9ca6 100644 --- a/pkg/manager/input/input-cursor/manager.go +++ b/pkg/manager/input/input-cursor/manager.go @@ -27,8 +27,8 @@ import ( "github.com/elastic/go-concert/unison" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -108,8 +108,8 @@ func (cim *InputManager) init() error { // Init starts background processes for deleting old entries from the // persistent store if mode is ModeRun. -func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { - if mode != v2.ModeRun { +func (cim *InputManager) Init(group unison.Group, mode input.Mode) error { + if mode != input.ModeRun { return nil } @@ -145,9 +145,9 @@ func (cim *InputManager) shutdown() { cim.store.Release() } -// Create builds a new v2.Input using the provided Configure function. +// Create builds a new input.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. -func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { +func (cim *InputManager) Create(config *conf.C) (input.Input, error) { if err := cim.init(); err != nil { return nil, err } @@ -182,7 +182,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { // Lock locks a key for exclusive access and returns an resource that can be used to modify // the cursor state and unlock the key. -func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) { +func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { resource := cim.store.Get(key) err := lockResource(ctx.Logger, resource, ctx.Cancelation) if err != nil { @@ -192,7 +192,7 @@ func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) { return resource, nil } -func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error { +func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { if !resource.lock.TryLock() { log.Infof("Resource '%v' currently in use, waiting...", resource.key) err := resource.lock.LockContext(canceler) diff --git a/pkg/manager/input/input-cursor/manager_test.go b/pkg/manager/input/input-cursor/manager_test.go index 6b97763..7388ba6 100644 --- a/pkg/manager/input/input-cursor/manager_test.go +++ b/pkg/manager/input/input-cursor/manager_test.go @@ -30,11 +30,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - input "github.com/elastic/beats/v7/filebeat/input/v2" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/manager/internal/resources" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + pubtest "github.com/elastic/elastic-agent-inputs/pkg/publisher/testing" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -63,11 +62,11 @@ func TestManager_Init(t *testing.T) { DefaultCleanTimeout: 10 * time.Millisecond, } - err := manager.Init(&grp, v2.ModeRun) + err := manager.Init(&grp, input.ModeRun) require.NoError(t, err) time.Sleep(200 * time.Millisecond) - grp.Stop() + _ = grp.Stop() // wait for all go-routines to be gone @@ -86,7 +85,9 @@ func TestManager_Init(t *testing.T) { store.GCPeriod = 10 * time.Millisecond var grp unison.TaskGroup - defer grp.Stop() + defer func() { + _ = grp.Stop() + }() manager := &InputManager{ Logger: logp.NewLogger("test"), StateStore: store, @@ -94,7 +95,7 @@ func TestManager_Init(t *testing.T) { DefaultCleanTimeout: 10 * time.Millisecond, } - err := manager.Init(&grp, v2.ModeRun) + err := manager.Init(&grp, input.ModeRun) require.NoError(t, err) for len(store.snapshot()) > 0 { @@ -157,7 +158,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { mu.Lock() defer mu.Unlock() seen = append(seen, source.Name()) @@ -179,7 +180,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(_ Source, ctx v2.TestContext) error { + OnTest: func(_ Source, ctx input.TestContext) error { <-ctx.Cancelation.Done() return nil }, @@ -209,7 +210,7 @@ func TestManager_InputsTest(t *testing.T) { sources := []Source{failing, stringSource("source2")} manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { if source == failing { t.Log("return error") return errors.New("oops") @@ -238,7 +239,7 @@ func TestManager_InputsTest(t *testing.T) { defer resources.NewGoroutinesChecker().Check(t) manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { + OnTest: func(source Source, _ input.TestContext) error { panic("oops") }, }) @@ -278,7 +279,7 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -302,7 +303,7 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -331,7 +332,7 @@ func TestManager_InputsRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, clientCounters.BuildConnector()) @@ -348,7 +349,7 @@ func TestManager_InputsRun(t *testing.T) { type runConfig struct{ Max int } - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() manager := simpleManagerWithConfigure(t, func(cfg *conf.C) ([]Source, Input, error) { @@ -367,9 +368,9 @@ func TestManager_InputsRun(t *testing.T) { } for i := 0; i < config.Max; i++ { - event := beat.Event{Fields: mapstr.M{"n": state.N}} + event := publisher.Event{Fields: mapstr.M{"n": state.N}} state.N++ - pub.Publish(event, state) + mustPublish(pub, event, state) } return nil }, @@ -380,8 +381,11 @@ func TestManager_InputsRun(t *testing.T) { var ids []int pipeline := pubtest.ConstClient(&pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { - id := event.Fields["n"].(int) + PublishFunc: func(event publisher.Event) { + id, ok := event.Fields["n"].(int) + if !ok { + panic(fmt.Errorf("cannot convert id to int")) + } ids = append(ids, id) }, }) @@ -397,10 +401,11 @@ func TestManager_InputsRun(t *testing.T) { // create and run second instance instance inp, err = manager.Create(conf.MustNewConfigFrom(runConfig{Max: 3})) require.NoError(t, err) - inp.Run(input.Context{ + err = inp.Run(input.Context{ Logger: log, Cancelation: context.Background(), }, pipeline) + assert.NoError(t, err) // verify assert.Equal(t, []int{0, 1, 2, 3, 4, 5}, ids) @@ -416,13 +421,13 @@ func TestManager_InputsRun(t *testing.T) { OnRun: func(ctx input.Context, _ Source, _ Cursor, pub Publisher) error { defer wgSend.Done() fields := mapstr.M{"hello": "world"} - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state1") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state2") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state3") - pub.Publish(beat.Event{Fields: fields}, nil) - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state4") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state5") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state6") + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state1") + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state2") + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state3") + mustPublish(pub, publisher.Event{Fields: fields}, nil) + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state4") + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state5") + mustPublish(pub, publisher.Event{Fields: fields}, "test-cursor-state6") return nil }, }) @@ -435,15 +440,15 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() // setup publishing pipeline and capture ACKer, so we can simulate progress in the Output - var acker beat.ACKer + var acker publisher.ACKer var wgACKer sync.WaitGroup wgACKer.Add(1) pipeline := &pubtest.FakeConnector{ - ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { + ConnectFunc: func(cfg publisher.ClientConfig) (publisher.Client, error) { defer wgACKer.Done() acker = cfg.ACKHandler return &pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { + PublishFunc: func(event publisher.Event) { acker.AddEvent(event, true) }, }, nil @@ -455,7 +460,7 @@ func TestManager_InputsRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = inp.Run(v2.Context{ + err = inp.Run(input.Context{ Logger: manager.Logger, Cancelation: cancelCtx, }, pipeline) @@ -488,9 +493,16 @@ func TestManager_InputsRun(t *testing.T) { }) } +func mustPublish(p Publisher, e publisher.Event, cursor interface{}) { + err := p.Publish(e, cursor) + if err != nil { + panic(err) + } +} + func TestLockResource(t *testing.T) { t.Run("can lock unused resource", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") @@ -501,7 +513,7 @@ func TestLockResource(t *testing.T) { t.Run("fail to lock resource in use when context is cancelled", func(t *testing.T) { log := logp.NewLogger("test") - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() resUsed := store.Get("test::key") @@ -524,7 +536,7 @@ func TestLockResource(t *testing.T) { t.Run("succeed to lock resource after it has been released", func(t *testing.T) { log := logp.NewLogger("test") - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() resUsed := store.Get("test::key") diff --git a/pkg/manager/input/input-cursor/publish.go b/pkg/manager/input/input-cursor/publish.go index 923c212..c20ac1b 100644 --- a/pkg/manager/input/input-cursor/publish.go +++ b/pkg/manager/input/input-cursor/publish.go @@ -20,17 +20,17 @@ package cursor import ( "time" - input "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" - "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + "github.com/elastic/elastic-agent-inputs/pkg/statestore" + "github.com/elastic/elastic-agent-libs/transform/typeconv" ) // Publisher is used to publish an event and update the cursor in a single call to Publish. // Inputs are allowed to pass `nil` as cursor state. In this case the state is not updated, but the // event will still be published as is. type Publisher interface { - Publish(event beat.Event, cursor interface{}) error + Publish(event publisher.Event, cursor interface{}) error } // cursorPublisher implements the Publisher interface and used internally by the managedInput. @@ -40,7 +40,7 @@ type Publisher interface { // handler, persisting the pending update. type cursorPublisher struct { canceler input.Canceler - client beat.Client + client publisher.Client cursor *Cursor } @@ -63,7 +63,7 @@ type updateOp struct { // It overwrite event.Private with the update operation, before finally sending the event. // The ACK ordering in the publisher pipeline guarantees that update operations // will be ACKed and executed in the correct order. -func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) error { +func (c *cursorPublisher) Publish(event publisher.Event, cursorUpdate interface{}) error { if cursorUpdate == nil { return c.forward(event) } @@ -77,7 +77,7 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er return c.forward(event) } -func (c *cursorPublisher) forward(event beat.Event) error { +func (c *cursorPublisher) forward(event publisher.Event) error { c.client.Publish(event) if c.canceler == nil { return nil @@ -94,7 +94,7 @@ func createUpdateOp(store *store, resource *resource, updates interface{}) (*upd cursor := resource.pendingCursor if resource.activeCursorOperations == 0 { var tmp interface{} - typeconv.Convert(&tmp, cursor) + _ = typeconv.Convert(&tmp, cursor) resource.pendingCursor = tmp cursor = tmp } @@ -133,7 +133,7 @@ func (op *updateOp) Execute(n uint) { resource.cursor = resource.pendingCursor resource.pendingCursor = nil } else { - typeconv.Convert(&resource.cursor, op.delta) + _ = typeconv.Convert(&resource.cursor, op.delta) } if resource.internalState.Updated.Before(op.timestamp) { diff --git a/pkg/manager/input/input-cursor/publish_test.go b/pkg/manager/input/input-cursor/publish_test.go index 28c274b..c685aa9 100644 --- a/pkg/manager/input/input-cursor/publish_test.go +++ b/pkg/manager/input/input-cursor/publish_test.go @@ -24,37 +24,39 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/beat" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + pubtest "github.com/elastic/elastic-agent-inputs/pkg/publisher/testing" ) func TestPublish(t *testing.T) { t.Run("event with cursor state creates update operation", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() cursor := makeCursor(store, store.Get("test::key")) - var actual beat.Event + var actual publisher.Event client := &pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { actual = event }, + PublishFunc: func(event publisher.Event) { actual = event }, } - publisher := cursorPublisher{nil, client, &cursor} - publisher.Publish(beat.Event{}, "test") + p := cursorPublisher{nil, client, &cursor} + err := p.Publish(publisher.Event{}, "test") + require.NoError(t, err) require.NotNil(t, actual.Private) }) t.Run("event without cursor creates no update operation", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() cursor := makeCursor(store, store.Get("test::key")) - var actual beat.Event + var actual publisher.Event client := &pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { actual = event }, + PublishFunc: func(event publisher.Event) { actual = event }, } - publisher := cursorPublisher{nil, client, &cursor} - publisher.Publish(beat.Event{}, nil) + p := cursorPublisher{nil, client, &cursor} + err := p.Publish(publisher.Event{}, nil) + require.NoError(t, err) require.Nil(t, actual.Private) }) @@ -62,19 +64,19 @@ func TestPublish(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) cancel() - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() cursor := makeCursor(store, store.Get("test::key")) - publisher := cursorPublisher{ctx, &pubtest.FakeClient{}, &cursor} - err := publisher.Publish(beat.Event{}, nil) + p := cursorPublisher{ctx, &pubtest.FakeClient{}, &cursor} + err := p.Publish(publisher.Event{}, nil) require.Equal(t, context.Canceled, err) }) } func TestOp_Execute(t *testing.T) { t.Run("applying final op marks the key as finished", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") @@ -99,7 +101,7 @@ func TestOp_Execute(t *testing.T) { // when acking N events, intermediate updates are dropped in favor of the latest update operation. // This test checks that the resource is correctly marked as finished. - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") @@ -125,7 +127,7 @@ func TestOp_Execute(t *testing.T) { // when acking N events, intermediate updates are dropped in favor of the latest update operation. // This test checks that the resource is correctly marked as finished. - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") diff --git a/pkg/manager/input/input-cursor/store.go b/pkg/manager/input/input-cursor/store.go index cc755f0..c3436f8 100644 --- a/pkg/manager/input/input-cursor/store.go +++ b/pkg/manager/input/input-cursor/store.go @@ -22,13 +22,13 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/common/cleanup" - "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" - "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/cleanup" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transform/typeconv" "github.com/elastic/go-concert" "github.com/elastic/go-concert/unison" + "go.uber.org/atomic" ) // store encapsulates the persistent store and the in memory state store, that @@ -290,7 +290,7 @@ func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*stat } err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { + if !strings.HasPrefix(key, keyPrefix) { return true, nil } diff --git a/pkg/manager/input/input-cursor/store_test.go b/pkg/manager/input/input-cursor/store_test.go index ffad94f..8065411 100644 --- a/pkg/manager/input/input-cursor/store_test.go +++ b/pkg/manager/input/input-cursor/store_test.go @@ -26,8 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/elastic-agent-inputs/pkg/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/storetest" "github.com/elastic/elastic-agent-libs/logp" ) @@ -45,7 +45,7 @@ func TestStore_OpenClose(t *testing.T) { }) defer cleanup() - store := testOpenStore(t, "test", nil) + store := testOpenStore(t, nil) store.Release() require.True(t, closed) @@ -57,7 +57,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("load from empty", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() require.Equal(t, 0, len(storeMemorySnapshot(store))) require.Equal(t, 0, len(storeInSyncSnapshot(store))) @@ -69,7 +69,7 @@ func TestStore_OpenClose(t *testing.T) { "test::key1": {Cursor: "2"}, } - store := testOpenStore(t, "test", createSampleStore(t, states)) + store := testOpenStore(t, createSampleStore(t, states)) defer store.Release() checkEqualStoreState(t, states, storeMemorySnapshot(store)) @@ -82,7 +82,7 @@ func TestStore_OpenClose(t *testing.T) { "other::key": {Cursor: "2"}, } - store := testOpenStore(t, "test", createSampleStore(t, states)) + store := testOpenStore(t, createSampleStore(t, states)) defer store.Release() want := map[string]state{ @@ -96,7 +96,7 @@ func TestStore_OpenClose(t *testing.T) { func TestStore_Get(t *testing.T) { t.Run("find existing resource", func(t *testing.T) { cursorState := state{Cursor: "1"} - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key0": cursorState, })) defer store.Release() @@ -112,7 +112,7 @@ func TestStore_Get(t *testing.T) { }) t.Run("access unknown resource", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") @@ -124,7 +124,7 @@ func TestStore_Get(t *testing.T) { }) t.Run("same resource is returned", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res1 := store.Get("test::key") @@ -142,7 +142,7 @@ func TestStore_Get(t *testing.T) { func TestStore_UpdateTTL(t *testing.T) { t.Run("add TTL for new entry to store", func(t *testing.T) { // when creating a resource we set the TTL and insert a new key value pair without cursor value into the store: - store := testOpenStore(t, "test", createSampleStore(t, nil)) + store := testOpenStore(t, createSampleStore(t, nil)) defer store.Release() res := store.Get("test::key") @@ -161,7 +161,7 @@ func TestStore_UpdateTTL(t *testing.T) { }) t.Run("update TTL for in-sync resource does not overwrite state", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + store := testOpenStore(t, createSampleStore(t, map[string]state{ "test::key": { TTL: 1 * time.Second, Cursor: "test", @@ -196,7 +196,7 @@ func TestStore_UpdateTTL(t *testing.T) { Cursor: "test", }, }) - store := testOpenStore(t, "test", backend) + store := testOpenStore(t, backend) defer store.Release() // create pending update operation @@ -235,12 +235,12 @@ func closeStoreWith(fn func(s *store)) func() { } } -func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store { +func testOpenStore(t *testing.T, persistentStore StateStore) *store { if persistentStore == nil { persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + store, err := openStore(logp.NewLogger("test"), persistentStore, "test") if err != nil { t.Fatalf("failed to open the store") } @@ -329,10 +329,8 @@ func storeInSyncSnapshot(store *store) map[string]state { // fails with Errorf if the state differ. // // Note: testify is too strict when comparing timestamp, better use checkEqualStoreState. -func checkEqualStoreState(t *testing.T, want, got map[string]state) bool { +func checkEqualStoreState(t *testing.T, want, got map[string]state) { if d := cmp.Diff(want, got); d != "" { t.Errorf("store state mismatch (-want +got):\n%s", d) - return false } - return true } diff --git a/pkg/manager/input/input-stateless/stateless.go b/pkg/manager/input/input-stateless/stateless.go index c9d5114..d7335bb 100644 --- a/pkg/manager/input/input-stateless/stateless.go +++ b/pkg/manager/input/input-stateless/stateless.go @@ -23,8 +23,8 @@ import ( "github.com/elastic/go-concert/unison" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" conf "github.com/elastic/elastic-agent-libs/config" ) @@ -37,32 +37,32 @@ type InputManager struct { // Input is the interface transient inputs are required to implemented. type Input interface { Name() string - Test(v2.TestContext) error - Run(ctx v2.Context, publish Publisher) error + Test(input.TestContext) error + Run(ctx input.Context, publish Publisher) error } // Publisher is used by the Input to emit events. type Publisher interface { - Publish(beat.Event) + Publish(publisher.Event) } type configuredInput struct { input Input } -var _ v2.InputManager = InputManager{} +var _ input.InputManager = InputManager{} // NewInputManager wraps the given configure function to create a new stateless input manager. func NewInputManager(configure func(*conf.C) (Input, error)) InputManager { return InputManager{Configure: configure} } -// Init does nothing. Init is required to fullfil the v2.InputManager interface. -func (m InputManager) Init(_ unison.Group, _ v2.Mode) error { return nil } +// Init does nothing. Init is required to fullfil the input.InputManager interface. +func (m InputManager) Init(_ unison.Group, _ input.Mode) error { return nil } // Create configures a transient input and ensures that the final input can be used with // with the filebeat input architecture. -func (m InputManager) Create(cfg *conf.C) (v2.Input, error) { +func (m InputManager) Create(cfg *conf.C) (input.Input, error) { inp, err := m.Configure(cfg) if err != nil { return nil, err @@ -72,7 +72,7 @@ func (m InputManager) Create(cfg *conf.C) (v2.Input, error) { func (si configuredInput) Name() string { return si.input.Name() } -func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (err error) { +func (si configuredInput) Run(ctx input.Context, pipeline publisher.PipelineConnector) (err error) { defer func() { if v := recover(); v != nil { if e, ok := v.(error); ok { @@ -83,8 +83,8 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) ( } }() - client, err := pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.DefaultGuarantees, + client, err := pipeline.ConnectWith(publisher.ClientConfig{ + PublishMode: publisher.DefaultGuarantees, // configure pipeline to disconnect input on stop signal. CloseRef: ctx.Cancelation, @@ -97,6 +97,6 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) ( return si.input.Run(ctx, client) } -func (si configuredInput) Test(ctx v2.TestContext) error { +func (si configuredInput) Test(ctx input.TestContext) error { return si.input.Test(ctx) } diff --git a/pkg/manager/input/input-stateless/stateless_test.go b/pkg/manager/input/input-stateless/stateless_test.go index 1362733..4460664 100644 --- a/pkg/manager/input/input-stateless/stateless_test.go +++ b/pkg/manager/input/input-stateless/stateless_test.go @@ -26,32 +26,32 @@ import ( "github.com/stretchr/testify/require" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + stateless "github.com/elastic/elastic-agent-inputs/pkg/manager/input/input-stateless" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + pubtest "github.com/elastic/elastic-agent-inputs/pkg/publisher/testing" + "github.com/elastic/elastic-agent-libs/atomic" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) type fakeStatelessInput struct { - OnTest func(v2.TestContext) error - OnRun func(v2.Context, stateless.Publisher) error + OnTest func(input.TestContext) error + OnRun func(input.Context, stateless.Publisher) error } func TestStateless_Run(t *testing.T) { t.Run("events are published", func(t *testing.T) { const numEvents = 5 - ch := make(chan beat.Event) + ch := make(chan publisher.Event) connector := pubtest.ConstClient(pubtest.ChClient(ch)) - input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ - OnRun: func(ctx v2.Context, publisher stateless.Publisher) error { + inp := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ + OnRun: func(ctx input.Context, p stateless.Publisher) error { defer close(ch) for i := 0; i < numEvents; i++ { - publisher.Publish(beat.Event{Fields: map[string]interface{}{"id": i}}) + p.Publish(publisher.Event{Fields: map[string]interface{}{"id": i}}) } return nil }, @@ -62,7 +62,7 @@ func TestStateless_Run(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = input.Run(v2.Context{}, connector) + err = inp.Run(input.Context{}, connector) }() var receivedEvents int @@ -76,14 +76,14 @@ func TestStateless_Run(t *testing.T) { }) t.Run("capture panic and return error", func(t *testing.T) { - input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ - OnRun: func(_ v2.Context, _ stateless.Publisher) error { + inp := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ + OnRun: func(_ input.Context, _ stateless.Publisher) error { panic("oops") }, }), nil) var clientCounters pubtest.ClientCounter - err := input.Run(v2.Context{}, clientCounters.BuildConnector()) + err := inp.Run(input.Context{}, clientCounters.BuildConnector()) require.Error(t, err) require.Equal(t, 1, clientCounters.Total()) @@ -93,11 +93,11 @@ func TestStateless_Run(t *testing.T) { t.Run("publisher unblocks if shutdown signal is send", func(t *testing.T) { // the input blocks in the publisher. We loop until the shutdown signal is received var started atomic.Bool - input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ - OnRun: func(ctx v2.Context, publisher stateless.Publisher) error { + inp := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ + OnRun: func(ctx input.Context, p stateless.Publisher) error { for ctx.Cancelation.Err() == nil { started.Store(true) - publisher.Publish(beat.Event{ + p.Publish(publisher.Event{ Fields: mapstr.M{ "hello": "world", }, @@ -110,9 +110,9 @@ func TestStateless_Run(t *testing.T) { // connector creates a client the blocks forever until the shutdown signal is received var publishCalls atomic.Int connector := pubtest.FakeConnector{ - ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) { + ConnectFunc: func(config publisher.ClientConfig) (publisher.Client, error) { return &pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { + PublishFunc: func(event publisher.Event) { publishCalls.Inc() <-config.CloseRef.Done() }, @@ -128,7 +128,7 @@ func TestStateless_Run(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = input.Run(v2.Context{Cancelation: ctx}, connector) + err = inp.Run(input.Context{Cancelation: ctx}, connector) }() // signal and wait for shutdown @@ -148,14 +148,14 @@ func TestStateless_Run(t *testing.T) { connector := pubtest.FailingConnector(errOpps) var run atomic.Int - input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ - OnRun: func(_ v2.Context, publisher stateless.Publisher) error { + i := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ + OnRun: func(_ input.Context, publisher stateless.Publisher) error { run.Inc() return nil }, }), nil) - err := input.Run(v2.Context{}, connector) + err := i.Run(input.Context{}, connector) require.True(t, errors.Is(err, errOpps)) require.Equal(t, 0, run.Load()) }) @@ -163,21 +163,22 @@ func TestStateless_Run(t *testing.T) { func (f *fakeStatelessInput) Name() string { return "test" } -func (f *fakeStatelessInput) Test(ctx v2.TestContext) error { +func (f *fakeStatelessInput) Test(ctx input.TestContext) error { if f.OnTest != nil { return f.OnTest(ctx) } return nil } -func (f *fakeStatelessInput) Run(ctx v2.Context, publish stateless.Publisher) error { +func (f *fakeStatelessInput) Run(ctx input.Context, publish stateless.Publisher) error { if f.OnRun != nil { return f.OnRun(ctx, publish) } return errors.New("oops, run not implemented") } -func createConfiguredInput(t *testing.T, manager stateless.InputManager, config map[string]interface{}) v2.Input { +//nolint:unparam // when we add more tests it will get a config +func createConfiguredInput(t *testing.T, manager stateless.InputManager, config map[string]interface{}) input.Input { input, err := manager.Create(conf.MustNewConfigFrom(config)) require.NoError(t, err) return input diff --git a/pkg/manager/input/input.go b/pkg/manager/input/input.go index 42706bc..bda0484 100644 --- a/pkg/manager/input/input.go +++ b/pkg/manager/input/input.go @@ -15,12 +15,15 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( - "github.com/elastic/beats/v7/libbeat/beat" + "time" + + "github.com/elastic/elastic-agent-inputs/pkg/publisher" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/gofrs/uuid" "github.com/elastic/go-concert/unison" ) @@ -72,7 +75,25 @@ type Input interface { // Run starts the data collection. Run must return an error only if the // error is fatal making it impossible for the input to recover. - Run(Context, beat.PipelineConnector) error + Run(Context, publisher.PipelineConnector) error +} + +// Info stores a input instance meta data. +type Info struct { + Input string // The actual beat's name + IndexPrefix string // The beat's index prefix in Elasticsearch. + Version string // The beat version. Defaults to the libbeat version when an implementation does not set a version + Name string // configured beat name + Hostname string // hostname + ID uuid.UUID // ID assigned to beat machine + EphemeralID uuid.UUID // ID assigned to beat process invocation (PID) + FirstStart time.Time // The time of the first start of the Beat. + StartTime time.Time // The time of last start of the Beat. Updated when the Beat is started or restarted. + + // Monitoring-related fields + Monitoring struct { + DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + } } // Context provides the Input Run function with common environmental @@ -86,7 +107,7 @@ type Context struct { ID string // Agent provides additional Beat info like instance ID or beat name. - Agent beat.Info + Agent Info // Cancelation is used by Beats to signal the input to shutdown. Cancelation Canceler @@ -99,10 +120,10 @@ type TestContext struct { // with labels that will identify logs for the input. Logger *logp.Logger - // Agent provides additional Beat info like instance ID or beat name. - Agent beat.Info + // Agent provides additional info like instance ID or beat name. + Agent Info - // Cancelation is used by Beats to signal the input to shutdown. + // Cancelation is used by the binary to signal the input to shutdown. Cancelation Canceler } diff --git a/pkg/manager/input/internal/inputest/inputest.go b/pkg/manager/input/internal/inputest/inputest.go index 696b86e..2256fad 100644 --- a/pkg/manager/input/internal/inputest/inputest.go +++ b/pkg/manager/input/internal/inputest/inputest.go @@ -20,9 +20,9 @@ package inputest import ( "errors" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-inputs/pkg/feature" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/go-concert/unison" ) @@ -30,24 +30,24 @@ import ( // MockInputManager can be used as InputManager replacement in tests that require a new Input Manager. // The OnInit and OnConfigure functions are executed if the corresponding methods get called. type MockInputManager struct { - OnInit func(v2.Mode) error + OnInit func(input.Mode) error OnConfigure InputConfigurer } // InputConfigurer describes the interface for user supplied functions, that is // used to create a new input from a configuration object. -type InputConfigurer func(*conf.C) (v2.Input, error) +type InputConfigurer func(*conf.C) (input.Input, error) // MockInput can be used as an Input instance in tests that require a new Input with definable behavior. // The OnTest and OnRun functions are executed if the corresponding methods get called. type MockInput struct { Type string - OnTest func(v2.TestContext) error - OnRun func(v2.Context, beat.PipelineConnector) error + OnTest func(input.TestContext) error + OnRun func(input.Context, publisher.PipelineConnector) error } // Init returns nil if OnInit is not set. Otherwise the return value of OnInit is returned. -func (m *MockInputManager) Init(_ unison.Group, mode v2.Mode) error { +func (m *MockInputManager) Init(_ unison.Group, mode input.Mode) error { if m.OnInit != nil { return m.OnInit(mode) } @@ -56,18 +56,18 @@ func (m *MockInputManager) Init(_ unison.Group, mode v2.Mode) error { // Create fails with an error if OnConfigure is not set. Otherwise the return // values of OnConfigure are returned. -func (m *MockInputManager) Create(cfg *conf.C) (v2.Input, error) { +func (m *MockInputManager) Create(cfg *conf.C) (input.Input, error) { if m.OnConfigure != nil { return m.OnConfigure(cfg) } return nil, errors.New("oops, OnConfigure not implemented ") } -// Name return the `Type` field of MockInput. It is required to satisfy the v2.Input interface. +// Name return the `Type` field of MockInput. It is required to satisfy the input.Input interface. func (f *MockInput) Name() string { return f.Type } // Test return nil if OnTest is not set. Otherwise OnTest will be called. -func (f *MockInput) Test(ctx v2.TestContext) error { +func (f *MockInput) Test(ctx input.TestContext) error { if f.OnTest != nil { return f.OnTest(ctx) } @@ -75,7 +75,7 @@ func (f *MockInput) Test(ctx v2.TestContext) error { } // Run returns nil if OnRun is not set. -func (f *MockInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) error { +func (f *MockInput) Run(ctx input.Context, pipeline publisher.PipelineConnector) error { if f.OnRun != nil { return f.OnRun(ctx, pipeline) } @@ -85,20 +85,20 @@ func (f *MockInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) error { // ConstInputManager create a MockInputManager that always returns input when // Configure is called. Use ConstInputManager for tests that require an // InputManager, but create only one Input instance. -func ConstInputManager(input v2.Input) *MockInputManager { +func ConstInputManager(input input.Input) *MockInputManager { return &MockInputManager{OnConfigure: ConfigureConstInput(input)} } // ConfigureConstInput return an InputConfigurer that returns always input when called. -func ConfigureConstInput(input v2.Input) InputConfigurer { - return func(_ *conf.C) (v2.Input, error) { - return input, nil +func ConfigureConstInput(i input.Input) InputConfigurer { + return func(_ *conf.C) (input.Input, error) { + return i, nil } } -// SinglePlugin wraps an InputManager into a slice of v2.Plugin, that can be used directly with v2.NewLoader. -func SinglePlugin(name string, manager v2.InputManager) []v2.Plugin { - return []v2.Plugin{{ +// SinglePlugin wraps an InputManager into a slice of input.Plugin, that can be used directly with input.NewLoader. +func SinglePlugin(name string, manager input.InputManager) []input.Plugin { + return []input.Plugin{{ Name: name, Stability: feature.Stable, Manager: manager, diff --git a/pkg/manager/input/internal/inputest/loader.go b/pkg/manager/input/internal/inputest/loader.go index 5093a0c..5f2e370 100644 --- a/pkg/manager/input/internal/inputest/loader.go +++ b/pkg/manager/input/internal/inputest/loader.go @@ -20,7 +20,7 @@ package inputest import ( "testing" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/elastic-agent-inputs/pkg/manager/input" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -28,13 +28,13 @@ import ( // Loader wraps the input Loader in order to provide additional methods for reuse in tests. type Loader struct { t testing.TB - *v2.Loader + *input.Loader } // MustNewTestLoader creates a new Loader. The test fails with fatal if the // NewLoader constructor function returns an error. -func MustNewTestLoader(t testing.TB, plugins []v2.Plugin, typeField, defaultType string) *Loader { - l, err := v2.NewLoader(logp.NewLogger("test"), plugins, typeField, defaultType) +func MustNewTestLoader(t testing.TB, plugins []input.Plugin, typeField, defaultType string) *Loader { + l, err := input.NewLoader(logp.NewLogger("test"), plugins, typeField, defaultType) if err != nil { t.Fatalf("Failed to create loader: %v", err) } @@ -43,7 +43,7 @@ func MustNewTestLoader(t testing.TB, plugins []v2.Plugin, typeField, defaultType // MustConfigure confiures a new input. The test fails with t.Fatal if the // operation failed. -func (l *Loader) MustConfigure(cfg *conf.C) v2.Input { +func (l *Loader) MustConfigure(cfg *conf.C) input.Input { i, err := l.Configure(cfg) if err != nil { l.t.Fatalf("Failed to create the input: %v", err) diff --git a/pkg/manager/input/loader.go b/pkg/manager/input/loader.go index 2c7b397..9fdf8f4 100644 --- a/pkg/manager/input/loader.go +++ b/pkg/manager/input/loader.go @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "fmt" - "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-inputs/pkg/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/unison" diff --git a/pkg/manager/input/loader_test.go b/pkg/manager/input/loader_test.go index 56fac1a..6f8b470 100644 --- a/pkg/manager/input/loader_test.go +++ b/pkg/manager/input/loader_test.go @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "errors" "testing" - "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-inputs/pkg/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) diff --git a/pkg/manager/input/mode_string.go b/pkg/manager/input/mode_string.go index 330b15f..3c418cb 100644 --- a/pkg/manager/input/mode_string.go +++ b/pkg/manager/input/mode_string.go @@ -17,7 +17,7 @@ // Code generated by "stringer -type Mode -trimprefix Mode"; DO NOT EDIT. -package v2 +package input import "strconv" diff --git a/pkg/manager/input/plugin.go b/pkg/manager/input/plugin.go index 8108497..61704b3 100644 --- a/pkg/manager/input/plugin.go +++ b/pkg/manager/input/plugin.go @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "fmt" - "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-inputs/pkg/feature" ) // Plugin describes an input type. Input types should provide a constructor diff --git a/pkg/manager/input/plugin_test.go b/pkg/manager/input/plugin_test.go index 6a11173..0595bbb 100644 --- a/pkg/manager/input/plugin_test.go +++ b/pkg/manager/input/plugin_test.go @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "testing" - "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-inputs/pkg/feature" ) func TestPlugin_Validate(t *testing.T) { diff --git a/pkg/manager/input/simplemanager.go b/pkg/manager/input/simplemanager.go index 1ce7545..5fa2484 100644 --- a/pkg/manager/input/simplemanager.go +++ b/pkg/manager/input/simplemanager.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( conf "github.com/elastic/elastic-agent-libs/config" diff --git a/pkg/manager/input/util_test.go b/pkg/manager/input/util_test.go index 59697b2..fa20527 100644 --- a/pkg/manager/input/util_test.go +++ b/pkg/manager/input/util_test.go @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -package v2 +package input import ( "errors" "testing" - "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/go-concert/unison" ) @@ -34,7 +34,7 @@ type fakeInputManager struct { type fakeInput struct { Type string OnTest func(TestContext) error - OnRun func(Context, beat.PipelineConnector) error + OnRun func(Context, publisher.PipelineConnector) error } func makeConfigFakeInput(prototype fakeInput) func(*conf.C) (Input, error) { @@ -66,7 +66,7 @@ func (f *fakeInput) Test(ctx TestContext) error { return nil } -func (f *fakeInput) Run(ctx Context, pipeline beat.PipelineConnector) error { +func (f *fakeInput) Run(ctx Context, pipeline publisher.PipelineConnector) error { if f.OnRun != nil { return f.OnRun(ctx, pipeline) } diff --git a/pkg/manager/internal/resources/goroutines.go b/pkg/manager/internal/resources/goroutines.go index 7c9ab17..780070f 100644 --- a/pkg/manager/internal/resources/goroutines.go +++ b/pkg/manager/internal/resources/goroutines.go @@ -18,6 +18,7 @@ package resources import ( + "errors" "fmt" "os" "runtime" @@ -61,12 +62,12 @@ func (c GoroutinesChecker) Check(t testing.TB) { func dumpGoroutines() { profile := pprof.Lookup("goroutine") - profile.WriteTo(os.Stdout, 2) + _ = profile.WriteTo(os.Stdout, 2) } func (c GoroutinesChecker) check() error { after, err := c.WaitUntilOriginalCount() - if err == ErrTimeout { + if errors.Is(err, ErrTimeout) { return fmt.Errorf("possible goroutines leak, before: %d, after: %d", c.before, after) } return err diff --git a/pkg/publisher/acker/acker.go b/pkg/publisher/acker/acker.go index 59a2551..9ad3c41 100644 --- a/pkg/publisher/acker/acker.go +++ b/pkg/publisher/acker/acker.go @@ -20,33 +20,33 @@ package acker import ( "sync" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + "go.uber.org/atomic" ) // Nil creates an ACKer that does nothing. -func Nil() beat.ACKer { +func Nil() publisher.ACKer { return nilACKer{} } type nilACKer struct{} -func (nilACKer) AddEvent(event beat.Event, published bool) {} -func (nilACKer) ACKEvents(n int) {} -func (nilACKer) Close() {} +func (nilACKer) AddEvent(event publisher.Event, published bool) {} +func (nilACKer) ACKEvents(n int) {} +func (nilACKer) Close() {} // RawCounting reports the number of ACKed events as has been reported by the outputs or queue. // The ACKer does not keep track of dropped events. Events after the client has // been closed will still be reported. -func RawCounting(fn func(int)) beat.ACKer { +func RawCounting(fn func(int)) publisher.ACKer { return countACKer(fn) } type countACKer func(int) -func (countACKer) AddEvent(_ beat.Event, _ bool) {} -func (fn countACKer) ACKEvents(n int) { fn(n) } -func (countACKer) Close() {} +func (countACKer) AddEvent(_ publisher.Event, _ bool) {} +func (fn countACKer) ACKEvents(n int) { fn(n) } +func (countACKer) Close() {} // TrackingCounter keeps track of published and dropped events. It reports // the number of acked events from the queue in the 'acked' argument and the @@ -59,12 +59,12 @@ func (countACKer) Close() {} // event: X X D D X D D X D X X X // // If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: -// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - the drop sequence for events 2 and 3 is in between the number of forwarded and ACKed events // - events 5-6 have been dropped as well, but event 7 is not ACKed yet // // If there is no event currently tracked by this ACKer and the next event is dropped by the processors, // then `fn` will be called immediately with acked=0 and total=1. -func TrackingCounter(fn func(acked, total int)) beat.ACKer { +func TrackingCounter(fn func(acked, total int)) publisher.ACKer { a := &trackingACKer{fn: fn} init := &gapInfo{} a.lst.head = init @@ -74,7 +74,7 @@ func TrackingCounter(fn func(acked, total int)) beat.ACKer { // Counting returns an ACK count for all events a client has tried to publish. // The ACKer keeps track of dropped events as well, and adjusts the ACK from the outputs accordingly. -func Counting(fn func(n int)) beat.ACKer { +func Counting(fn func(n int)) publisher.ACKer { return TrackingCounter(func(_ int, total int) { fn(total) }) @@ -97,7 +97,7 @@ type gapInfo struct { send, dropped int } -func (a *trackingACKer) AddEvent(_ beat.Event, published bool) { +func (a *trackingACKer) AddEvent(_ publisher.Event, published bool) { a.events.Inc() if published { a.addPublishedEvent() @@ -221,22 +221,22 @@ func (a *trackingACKer) Close() {} // event: X X D D X D D X D X X X // // If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: -// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - the drop sequence for events 2 and 3 is in between the number of forwarded and ACKed events // - events 5-6 have been dropped as well, but event 7 is not ACKed yet -func EventPrivateReporter(fn func(acked int, data []interface{})) beat.ACKer { +func EventPrivateReporter(fn func(acked int, data []interface{})) publisher.ACKer { a := &eventDataACKer{fn: fn} a.ACKer = TrackingCounter(a.onACK) return a } type eventDataACKer struct { - beat.ACKer + publisher.ACKer mu sync.Mutex data []interface{} fn func(acked int, data []interface{}) } -func (a *eventDataACKer) AddEvent(event beat.Event, published bool) { +func (a *eventDataACKer) AddEvent(event publisher.Event, published bool) { a.mu.Lock() a.data = append(a.data, event.Private) a.mu.Unlock() @@ -260,7 +260,7 @@ func (a *eventDataACKer) onACK(acked, total int) { // LastEventPrivateReporter reports only the 'latest' published and acked // event if a batch of events have been ACKed. -func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer { +func LastEventPrivateReporter(fn func(acked int, data interface{})) publisher.ACKer { ignored := 0 return EventPrivateReporter(func(acked int, data []interface{}) { for i := len(data) - 1; i >= 0; i-- { @@ -277,13 +277,13 @@ func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer { } // Combine forwards events to a list of ackers. -func Combine(as ...beat.ACKer) beat.ACKer { +func Combine(as ...publisher.ACKer) publisher.ACKer { return ackerList(as) } -type ackerList []beat.ACKer +type ackerList []publisher.ACKer -func (l ackerList) AddEvent(event beat.Event, published bool) { +func (l ackerList) AddEvent(event publisher.Event, published bool) { for _, a := range l { a.AddEvent(event, published) } @@ -304,16 +304,16 @@ func (l ackerList) Close() { // ConnectionOnly ensures that the given ACKer is only used for as long as the // pipeline Client is active. Once the Client is closed, the ACKer will drop // its internal state and no more ACK events will be processed. -func ConnectionOnly(a beat.ACKer) beat.ACKer { +func ConnectionOnly(a publisher.ACKer) publisher.ACKer { return &clientOnlyACKer{acker: a} } type clientOnlyACKer struct { mu sync.Mutex - acker beat.ACKer + acker publisher.ACKer } -func (a *clientOnlyACKer) AddEvent(event beat.Event, published bool) { +func (a *clientOnlyACKer) AddEvent(event publisher.Event, published bool) { a.mu.Lock() defer a.mu.Unlock() if sub := a.acker; sub != nil { diff --git a/pkg/publisher/acker/acker_test.go b/pkg/publisher/acker/acker_test.go index 2f02ed5..f3b5e89 100644 --- a/pkg/publisher/acker/acker_test.go +++ b/pkg/publisher/acker/acker_test.go @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. +//nolint:dupl // tests are equivalent package acker import ( "testing" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/beat" ) type fakeACKer struct { - AddEventFunc func(event beat.Event, published bool) + AddEventFunc func(event publisher.Event, published bool) ACKEventsFunc func(n int) CloseFunc func() } @@ -36,8 +36,8 @@ func TestNil(t *testing.T) { require.NotNil(t, acker) // check acker can be used without panic: - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, true) acker.ACKEvents(3) acker.Close() } @@ -54,7 +54,7 @@ func TestCounting(t *testing.T) { func TestTracking(t *testing.T) { t.Run("dropped event is acked immediately if empty", func(t *testing.T) { var acked, total int - TrackingCounter(func(a, t int) { acked, total = a, t }).AddEvent(beat.Event{}, false) + TrackingCounter(func(a, t int) { acked, total = a, t }).AddEvent(publisher.Event{}, false) require.Equal(t, 0, acked) require.Equal(t, 1, total) }) @@ -62,8 +62,8 @@ func TestTracking(t *testing.T) { t.Run("no dropped events", func(t *testing.T) { var acked, total int acker := TrackingCounter(func(a, t int) { acked, total = a, t }) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) acker.ACKEvents(2) require.Equal(t, 2, acked) require.Equal(t, 2, total) @@ -72,10 +72,10 @@ func TestTracking(t *testing.T) { t.Run("acking published includes dropped events in middle", func(t *testing.T) { var acked, total int acker := TrackingCounter(func(a, t int) { acked, total = a, t }) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, true) acker.ACKEvents(2) require.Equal(t, 2, acked) require.Equal(t, 4, total) @@ -84,11 +84,11 @@ func TestTracking(t *testing.T) { t.Run("acking published includes dropped events at end of ACK interval", func(t *testing.T) { var acked, total int acker := TrackingCounter(func(a, t int) { acked, total = a, t }) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, true) acker.ACKEvents(2) require.Equal(t, 2, acked) require.Equal(t, 4, total) @@ -97,13 +97,13 @@ func TestTracking(t *testing.T) { t.Run("partial ACKs", func(t *testing.T) { var acked, total int acker := TrackingCounter(func(a, t int) { acked, total = a, t }) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, false) - acker.AddEvent(beat.Event{}, true) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, false) + acker.AddEvent(publisher.Event{}, true) + acker.AddEvent(publisher.Event{}, true) acker.ACKEvents(2) require.Equal(t, 2, acked) @@ -120,7 +120,7 @@ func TestEventPrivateReporter(t *testing.T) { var acked int var data []interface{} acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) - acker.AddEvent(beat.Event{Private: 1}, false) + acker.AddEvent(publisher.Event{Private: 1}, false) require.Equal(t, 0, acked) require.Equal(t, []interface{}{1}, data) }) @@ -129,9 +129,9 @@ func TestEventPrivateReporter(t *testing.T) { var acked int var data []interface{} acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) - acker.AddEvent(beat.Event{Private: 1}, true) - acker.AddEvent(beat.Event{Private: 2}, true) - acker.AddEvent(beat.Event{Private: 3}, true) + acker.AddEvent(publisher.Event{Private: 1}, true) + acker.AddEvent(publisher.Event{Private: 2}, true) + acker.AddEvent(publisher.Event{Private: 3}, true) acker.ACKEvents(3) require.Equal(t, 3, acked) require.Equal(t, []interface{}{1, 2, 3}, data) @@ -141,9 +141,9 @@ func TestEventPrivateReporter(t *testing.T) { var acked int var data []interface{} acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) - acker.AddEvent(beat.Event{Private: 1}, true) - acker.AddEvent(beat.Event{Private: 2}, false) - acker.AddEvent(beat.Event{Private: 3}, true) + acker.AddEvent(publisher.Event{Private: 1}, true) + acker.AddEvent(publisher.Event{Private: 2}, false) + acker.AddEvent(publisher.Event{Private: 3}, true) acker.ACKEvents(2) require.Equal(t, 2, acked) require.Equal(t, []interface{}{1, 2, 3}, data) @@ -155,7 +155,7 @@ func TestLastEventPrivateReporter(t *testing.T) { var acked int var datum interface{} acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, datum = a, d }) - acker.AddEvent(beat.Event{Private: 1}, false) + acker.AddEvent(publisher.Event{Private: 1}, false) require.Equal(t, 0, acked) require.Equal(t, 1, datum) }) @@ -163,7 +163,7 @@ func TestLastEventPrivateReporter(t *testing.T) { t.Run("dropped event without private is ignored", func(t *testing.T) { var called bool acker := LastEventPrivateReporter(func(_ int, _ interface{}) { called = true }) - acker.AddEvent(beat.Event{Private: nil}, false) + acker.AddEvent(publisher.Event{Private: nil}, false) require.False(t, called) }) @@ -171,9 +171,9 @@ func TestLastEventPrivateReporter(t *testing.T) { var acked int var data interface{} acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, data = a, d }) - acker.AddEvent(beat.Event{Private: 1}, true) - acker.AddEvent(beat.Event{Private: 2}, true) - acker.AddEvent(beat.Event{Private: 3}, true) + acker.AddEvent(publisher.Event{Private: 1}, true) + acker.AddEvent(publisher.Event{Private: 2}, true) + acker.AddEvent(publisher.Event{Private: 3}, true) acker.ACKEvents(3) require.Equal(t, 3, acked) require.Equal(t, 3, data) @@ -184,7 +184,7 @@ func TestCombine(t *testing.T) { t.Run("AddEvent distributes", func(t *testing.T) { var a1, a2 int acker := Combine(countACKerOps(&a1, nil, nil), countACKerOps(&a2, nil, nil)) - acker.AddEvent(beat.Event{}, true) + acker.AddEvent(publisher.Event{}, true) require.Equal(t, 1, a1) require.Equal(t, 1, a2) }) @@ -223,15 +223,15 @@ func TestConnectionOnly(t *testing.T) { }) } -func countACKerOps(add, acked, close *int) beat.ACKer { +func countACKerOps(add, acked, close *int) publisher.ACKer { return &fakeACKer{ - AddEventFunc: func(_ beat.Event, _ bool) { *add++ }, + AddEventFunc: func(_ publisher.Event, _ bool) { *add++ }, ACKEventsFunc: func(_ int) { *acked++ }, CloseFunc: func() { *close++ }, } } -func (f *fakeACKer) AddEvent(event beat.Event, published bool) { +func (f *fakeACKer) AddEvent(event publisher.Event, published bool) { if f.AddEventFunc != nil { f.AddEventFunc(event, published) } diff --git a/pkg/publisher/event.go b/pkg/publisher/event.go index 958a3e4..698013c 100644 --- a/pkg/publisher/event.go +++ b/pkg/publisher/event.go @@ -1,4 +1,8 @@ package publisher +import "github.com/elastic/elastic-agent-libs/mapstr" + type Event struct { + Fields mapstr.M + Private interface{} } diff --git a/pkg/publisher/pipeline.go b/pkg/publisher/pipeline.go index 140d390..29335bf 100644 --- a/pkg/publisher/pipeline.go +++ b/pkg/publisher/pipeline.go @@ -83,7 +83,7 @@ type ACKer interface { // Close informs the ACKer that the Client used to publish to the pipeline has been closed. // No new events should be published anymore. The ACKEvents method still will be actively called // as long as there are pending events for the client in the pipeline. The Close signal can be used - // to supress any ACK event propagation if required. + // to suppress any ACK event propagation if required. // Close might be called from another go-routine than AddEvent and ACKEvents. Close() } diff --git a/pkg/publisher/testing/connector.go b/pkg/publisher/testing/connector.go index ddf48cc..c0d9377 100644 --- a/pkg/publisher/testing/connector.go +++ b/pkg/publisher/testing/connector.go @@ -18,50 +18,50 @@ package testing import ( - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" + "github.com/elastic/elastic-agent-libs/atomic" ) -// ClientCounter can be used to create a beat.PipelineConnector that count +// ClientCounter can be used to create a publisher.PipelineConnector that count // pipeline connects and disconnects. type ClientCounter struct { total atomic.Int active atomic.Int } -// FakeConnector implements the beat.PipelineConnector interface. +// FakeConnector implements the publisher.PipelineConnector interface. // The ConnectFunc is called for each connection attempt, and must be provided // by tests that wish to use FakeConnector. If ConnectFunc is nil tests will panic // if there is a connection attempt. type FakeConnector struct { - ConnectFunc func(beat.ClientConfig) (beat.Client, error) + ConnectFunc func(publisher.ClientConfig) (publisher.Client, error) } -// FakeClient implements the beat.Client interface. The implementation of a +// FakeClient implements the publisher.Client interface. The implementation of a // custom PublishFunc and CloseFunc are optional. type FakeClient struct { // If set PublishFunc is called for each event that is published by a producer. - PublishFunc func(beat.Event) + PublishFunc func(publisher.Event) // If set CloseFunc is called on Close. Otherwise Close returns nil. CloseFunc func() error } -var _ beat.PipelineConnector = FakeConnector{} -var _ beat.Client = (*FakeClient)(nil) +var _ publisher.PipelineConnector = FakeConnector{} +var _ publisher.Client = (*FakeClient)(nil) // ConnectWith calls the ConnectFunc with the given configuration. -func (c FakeConnector) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { +func (c FakeConnector) ConnectWith(cfg publisher.ClientConfig) (publisher.Client, error) { return c.ConnectFunc(cfg) } // Connect calls the ConnectFunc with an empty configuration. -func (c FakeConnector) Connect() (beat.Client, error) { - return c.ConnectWith(beat.ClientConfig{}) +func (c FakeConnector) Connect() (publisher.Client, error) { + return c.ConnectWith(publisher.ClientConfig{}) } // Publish calls PublishFunc, if PublishFunc is not nil. -func (c *FakeClient) Publish(event beat.Event) { +func (c *FakeClient) Publish(event publisher.Event) { if c.PublishFunc != nil { c.PublishFunc(event) } @@ -76,7 +76,7 @@ func (c *FakeClient) Close() error { } // PublishAll calls PublishFunc for each event in the given slice. -func (c *FakeClient) PublishAll(events []beat.Event) { +func (c *FakeClient) PublishAll(events []publisher.Event) { for _, event := range events { c.Publish(event) } @@ -84,27 +84,27 @@ func (c *FakeClient) PublishAll(events []beat.Event) { // FailingConnector creates a pipeline that will always fail with the // configured error value. -func FailingConnector(err error) beat.PipelineConnector { +func FailingConnector(err error) publisher.PipelineConnector { return &FakeConnector{ - ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) { + ConnectFunc: func(_ publisher.ClientConfig) (publisher.Client, error) { return nil, err }, } } -// ConstClient returns a pipeline that always returns the pre-configured beat.Client instance. -func ConstClient(client beat.Client) beat.PipelineConnector { +// ConstClient returns a pipeline that always returns the pre-configured publisher.Client instance. +func ConstClient(client publisher.Client) publisher.PipelineConnector { return &FakeConnector{ - ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) { + ConnectFunc: func(_ publisher.ClientConfig) (publisher.Client, error) { return client, nil }, } } -// ChClient create a beat.Client that will forward all events to the given channel. -func ChClient(ch chan beat.Event) beat.Client { +// ChClient create a publisher.Client that will forward all events to the given channel. +func ChClient(ch chan publisher.Event) publisher.Client { return &FakeClient{ - PublishFunc: func(event beat.Event) { + PublishFunc: func(event publisher.Event) { ch <- event }, } @@ -118,9 +118,9 @@ func (c *ClientCounter) Total() int { return c.total.Load() } // BuildConnector create a pipeline that updates the active and tocal // connection counters on Connect and Close calls. -func (c *ClientCounter) BuildConnector() beat.PipelineConnector { +func (c *ClientCounter) BuildConnector() publisher.PipelineConnector { return FakeConnector{ - ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) { + ConnectFunc: func(_ publisher.ClientConfig) (publisher.Client, error) { c.total.Inc() c.active.Inc() return &FakeClient{ diff --git a/pkg/publisher/testing/testing.go b/pkg/publisher/testing/testing.go index 0c64e46..3fd7f25 100644 --- a/pkg/publisher/testing/testing.go +++ b/pkg/publisher/testing/testing.go @@ -17,48 +17,47 @@ package testing +import "github.com/elastic/elastic-agent-inputs/pkg/publisher" + // ChanClient implements Client interface, forwarding published events to some -import ( - "github.com/elastic/beats/v7/libbeat/beat" -) type TestPublisher struct { - client beat.Client + client publisher.Client } // given channel only. type ChanClient struct { done chan struct{} - Channel chan beat.Event - publishCallback func(event beat.Event) + Channel chan publisher.Event + publishCallback func(event publisher.Event) } -func PublisherWithClient(client beat.Client) beat.Pipeline { +func PublisherWithClient(client publisher.Client) publisher.Pipeline { return &TestPublisher{client} } -func (pub *TestPublisher) Connect() (beat.Client, error) { +func (pub *TestPublisher) Connect() (publisher.Client, error) { return pub.client, nil } -func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error) { +func (pub *TestPublisher) ConnectWith(_ publisher.ClientConfig) (publisher.Client, error) { return pub.client, nil } -func NewChanClientWithCallback(bufSize int, callback func(event beat.Event)) *ChanClient { - chanClient := NewChanClientWith(make(chan beat.Event, bufSize)) +func NewChanClientWithCallback(bufSize int, callback func(event publisher.Event)) *ChanClient { + chanClient := NewChanClientWith(make(chan publisher.Event, bufSize)) chanClient.publishCallback = callback return chanClient } func NewChanClient(bufSize int) *ChanClient { - return NewChanClientWith(make(chan beat.Event, bufSize)) + return NewChanClientWith(make(chan publisher.Event, bufSize)) } -func NewChanClientWith(ch chan beat.Event) *ChanClient { +func NewChanClientWith(ch chan publisher.Event) *ChanClient { if ch == nil { - ch = make(chan beat.Event, 1) + ch = make(chan publisher.Event, 1) } c := &ChanClient{ done: make(chan struct{}), @@ -74,7 +73,7 @@ func (c *ChanClient) Close() error { // PublishEvent will publish the event on the channel. Options will be ignored. // Always returns true. -func (c *ChanClient) Publish(event beat.Event) { +func (c *ChanClient) Publish(event publisher.Event) { select { case <-c.done: case c.Channel <- event: @@ -85,12 +84,12 @@ func (c *ChanClient) Publish(event beat.Event) { } } -func (c *ChanClient) PublishAll(event []beat.Event) { +func (c *ChanClient) PublishAll(event []publisher.Event) { for _, e := range event { c.Publish(e) } } -func (c *ChanClient) ReceiveEvent() beat.Event { +func (c *ChanClient) ReceiveEvent() publisher.Event { return <-c.Channel } diff --git a/pkg/publisher/testing/testing_test.go b/pkg/publisher/testing/testing_test.go index 2098306..3c34c19 100644 --- a/pkg/publisher/testing/testing_test.go +++ b/pkg/publisher/testing/testing_test.go @@ -22,14 +22,14 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-inputs/pkg/publisher" "github.com/elastic/elastic-agent-libs/mapstr" ) var cnt = 0 -func testEvent() beat.Event { - event := beat.Event{ +func testEvent() publisher.Event { + event := publisher.Event{ Fields: mapstr.M{ "message": "test", "idx": cnt, @@ -52,7 +52,7 @@ func TestChanClientPublishEvents(t *testing.T) { cc := NewChanClient(1) e1, e2 := testEvent(), testEvent() - go cc.PublishAll([]beat.Event{e1, e2}) + go cc.PublishAll([]publisher.Event{e1, e2}) assert.Equal(t, e1, cc.ReceiveEvent()) assert.Equal(t, e2, cc.ReceiveEvent()) } diff --git a/pkg/statestore/backend/memlog/diskstore.go b/pkg/statestore/backend/memlog/diskstore.go index 6096327..e4843fa 100644 --- a/pkg/statestore/backend/memlog/diskstore.go +++ b/pkg/statestore/backend/memlog/diskstore.go @@ -29,7 +29,7 @@ import ( "sort" "strconv" - "github.com/elastic/beats/v7/libbeat/common/cleanup" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/cleanup" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) diff --git a/pkg/statestore/backend/memlog/doc.go b/pkg/statestore/backend/memlog/doc.go index 276daf9..514098b 100644 --- a/pkg/statestore/backend/memlog/doc.go +++ b/pkg/statestore/backend/memlog/doc.go @@ -56,7 +56,7 @@ // will trigger a checkpoint operation and reset the log file. // // The store might contain multiple data files, but only the last data file is -// supposed to be valid. Older data files will continiously tried to be cleaned up +// supposed to be valid. Older data files will continuously tried to be cleaned up // on checkpoint operations. // The data files filenames do include the change sequence number. Which allows // us to sort them by name. The checkpoint operation of memlog, writes the full diff --git a/pkg/statestore/backend/memlog/memlog.go b/pkg/statestore/backend/memlog/memlog.go index 8864d9c..e7a61bd 100644 --- a/pkg/statestore/backend/memlog/memlog.go +++ b/pkg/statestore/backend/memlog/memlog.go @@ -22,7 +22,7 @@ import ( "path/filepath" "sync" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" "github.com/elastic/elastic-agent-libs/logp" ) diff --git a/pkg/statestore/backend/memlog/memlog_test.go b/pkg/statestore/backend/memlog/memlog_test.go index c2f1a28..05585a8 100644 --- a/pkg/statestore/backend/memlog/memlog_test.go +++ b/pkg/statestore/backend/memlog/memlog_test.go @@ -29,14 +29,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/backend" - "github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance" + "github.com/elastic/elastic-agent-inputs/pkg/statestore" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/internal/storecompliance" "github.com/elastic/elastic-agent-libs/logp" ) func init() { - logp.DevelopmentSetup() + err := logp.DevelopmentSetup() + if err != nil { + panic(err) + } } func TestCompliance_Default(t *testing.T) { @@ -139,7 +142,7 @@ func testLoadVersion1Case(t *testing.T, dataPath string) { // check store does not contain any additional keys func() { err = store.Each(func(key string, val statestore.ValueDecoder) (bool, error) { - _, exists := expected.Entries[string(key)] + _, exists := expected.Entries[key] if !exists { t.Errorf("unexpected key: %s", key) } @@ -251,8 +254,3 @@ func isDir(path string) bool { info, err := os.Stat(path) return err == nil && info.IsDir() } - -func isFile(path string) bool { - info, err := os.Stat(path) - return err == nil && info.Mode().IsRegular() -} diff --git a/pkg/statestore/backend/memlog/store.go b/pkg/statestore/backend/memlog/store.go index 5bd6aac..e2de297 100644 --- a/pkg/statestore/backend/memlog/store.go +++ b/pkg/statestore/backend/memlog/store.go @@ -24,10 +24,10 @@ import ( "path/filepath" "sync" - "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/transform/typeconv" ) // store implements an actual memlog based store. diff --git a/pkg/statestore/backend/memlog/util.go b/pkg/statestore/backend/memlog/util.go index e2c5d4e..d788c4e 100644 --- a/pkg/statestore/backend/memlog/util.go +++ b/pkg/statestore/backend/memlog/util.go @@ -18,6 +18,7 @@ package memlog import ( + "errors" "io" "os" "runtime" @@ -63,7 +64,7 @@ func (e *ensureWriter) Write(p []byte) (int, error) { } func isRetryErr(err error) bool { - return err == syscall.EINTR || err == syscall.EAGAIN + return errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.EAGAIN) } // trySyncPath provides a best-effort fsync on path (directory). The fsync is required by some @@ -75,7 +76,7 @@ func trySyncPath(path string) { return // ignore error, sync on dir must not be necessarily supported by the FS } defer f.Close() - syncFile(f) + _ = syncFile(f) } // pathEnsurePermissions checks if the file permissions for the given file match wantPerm. diff --git a/pkg/statestore/backend/memlog/util_test.go b/pkg/statestore/backend/memlog/util_test.go index fca2a2b..2f34b6e 100644 --- a/pkg/statestore/backend/memlog/util_test.go +++ b/pkg/statestore/backend/memlog/util_test.go @@ -18,6 +18,7 @@ package memlog import ( + "errors" "syscall" "testing" ) @@ -57,7 +58,7 @@ func TestEnsureWriter_NonRetriableError(t *testing.T) { bytes := []byte{1, 2, 3} writer := &ensureWriter{errorWriter} written, err := writer.Write(bytes) - if err != syscall.EINVAL { + if !errors.Is(err, syscall.EINVAL) { t.Fatalf("ensureWriter should propagate nonretriable errors") } if written != 0 { diff --git a/pkg/statestore/cleanup/cleanup.go b/pkg/statestore/cleanup/cleanup.go index 5b13546..10e6d64 100644 --- a/pkg/statestore/cleanup/cleanup.go +++ b/pkg/statestore/cleanup/cleanup.go @@ -19,7 +19,7 @@ // // Use the helpers with `defer`. For example use IfNot with `defer`, such that // cleanup functions will be executed if `check` is false, no matter if an -// error has been returned or an panic has occured. +// error has been returned or an panic has occurred. // // initOK := false // defer cleanup.IfNot(&initOK, func() { @@ -59,7 +59,7 @@ func IfNotPred(pred func() bool, cleanup func()) { } } -// WithError returns a cleanup function calling a custom handler if an error occured. +// WithError returns a cleanup function calling a custom handler if an error occurred. func WithError(fn func(error), cleanup func() error) func() { return func() { if err := cleanup(); err != nil { diff --git a/pkg/statestore/cleanup/cleanup_test.go b/pkg/statestore/cleanup/cleanup_test.go index 6b45b9d..b64bae1 100644 --- a/pkg/statestore/cleanup/cleanup_test.go +++ b/pkg/statestore/cleanup/cleanup_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/common/cleanup" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/cleanup" ) func TestIfBool(t *testing.T) { diff --git a/pkg/statestore/error.go b/pkg/statestore/error.go index eac8bc8..85438ac 100644 --- a/pkg/statestore/error.go +++ b/pkg/statestore/error.go @@ -22,7 +22,7 @@ import ( "fmt" ) -// ErrorAccess indicates that an error occured when trying to open a Store. +// ErrorAccess indicates that an error occurred when trying to open a Store. type ErrorAccess struct { name string cause error @@ -84,8 +84,5 @@ func (e *ErrorOperation) Error() string { // IsClosed returns true if the cause for an Error is ErrorClosed. func IsClosed(err error) bool { var tmp *ErrorClosed - if errors.As(err, &tmp) { - return true - } - return false + return errors.As(err, &tmp) } diff --git a/pkg/statestore/internal/storecompliance/reg.go b/pkg/statestore/internal/storecompliance/reg.go index 924a15c..a769361 100644 --- a/pkg/statestore/internal/storecompliance/reg.go +++ b/pkg/statestore/internal/storecompliance/reg.go @@ -20,12 +20,12 @@ package storecompliance import ( "testing" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" ) // Registry helper for writing tests. // The registry uses a testing.T and provides some MustX methods that fail if -// an error occured. +// an error occurred. type Registry struct { T testing.TB backend.Registry @@ -34,7 +34,7 @@ type Registry struct { // Store helper for writing tests. // The store needs a reference to the Registry with the current test context. // The Store provides additional helpers for reopening the store, MustX methods -// that will fail the test if an error has occured. +// that will fail the test if an error has occurred. type Store struct { backend.Store @@ -51,7 +51,7 @@ func (r *Registry) Access(name string) (*Store, error) { return &Store{Store: s, Registry: r, name: name}, nil } -// MustAccess opens a Store. It fails the test if an error has occured. +// MustAccess opens a Store. It fails the test if an error has occurred. func (r *Registry) MustAccess(name string) *Store { store, err := r.Access(name) must(r.T, err, "open store") @@ -87,26 +87,26 @@ func (s *Store) Reopen() { s.Store = store } -// MustHave fails the test if an error occured in a call to Has. +// MustHave fails the test if an error occurred in a call to Has. func (s *Store) MustHave(key string) bool { b, err := s.Has(key) must(s.Registry.T, err, "unexpected error on store/has call") return b } -// MustGet fails the test if an error occured in a call to Get. +// MustGet fails the test if an error occurred in a call to Get. func (s *Store) MustGet(key string, into interface{}) { err := s.Get(key, into) must(s.Registry.T, err, "unexpected error on store/get call") } -// MustSet fails the test if an error occured in a call to Set. +// MustSet fails the test if an error occurred in a call to Set. func (s *Store) MustSet(key string, from interface{}) { err := s.Set(key, from) must(s.Registry.T, err, "unexpected error on store/set call") } -// MustRemove fails the test if an error occured in a call to Remove. +// MustRemove fails the test if an error occurred in a call to Remove. func (s *Store) MustRemove(key string) { err := s.Store.Remove(key) must(s.Registry.T, err, "unexpected error remove key") diff --git a/pkg/statestore/internal/storecompliance/storecompliance.go b/pkg/statestore/internal/storecompliance/storecompliance.go index 862144f..4fa177d 100644 --- a/pkg/statestore/internal/storecompliance/storecompliance.go +++ b/pkg/statestore/internal/storecompliance/storecompliance.go @@ -35,7 +35,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" ) // BackendFactory is used by TestBackendCompliance to create diff --git a/pkg/statestore/internal/storecompliance/util.go b/pkg/statestore/internal/storecompliance/util.go index d5a8848..a6355fd 100644 --- a/pkg/statestore/internal/storecompliance/util.go +++ b/pkg/statestore/internal/storecompliance/util.go @@ -23,7 +23,7 @@ import ( "os" "testing" - "github.com/elastic/beats/v7/libbeat/common/cleanup" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/cleanup" ) // RunWithPath uses the factory to create and configure a registry with a @@ -39,7 +39,7 @@ func RunWithPath(t *testing.T, factory BackendFactory, fn func(*Registry)) { // WithPath wraps a registry aware test function into a normalized test // function that can be used with `t.Run`. // The factory is used to create and configure the registry with a temporary -// test path. The registry is closed and the temporary test directoy is delete +// test path. The registry is closed and the temporary test directory is delete // if the test function returns or panics. func WithPath(factory BackendFactory, fn func(*testing.T, *Registry)) func(t *testing.T) { return func(t *testing.T) { diff --git a/pkg/statestore/mock_test.go b/pkg/statestore/mock_test.go index 69a1d80..007fb1e 100644 --- a/pkg/statestore/mock_test.go +++ b/pkg/statestore/mock_test.go @@ -18,9 +18,11 @@ package statestore import ( + "fmt" + "github.com/stretchr/testify/mock" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" ) type mockRegistry struct { @@ -39,7 +41,11 @@ func (m *mockRegistry) Access(name string) (backend.Store, error) { var store backend.Store if ifc := args.Get(0); ifc != nil { - store = ifc.(backend.Store) + var ok bool + store, ok = ifc.(backend.Store) + if !ok { + panic(fmt.Errorf("cannot convert interface to backend.Store: %v", ifc)) + } } return store, args.Error(1) diff --git a/pkg/statestore/registry.go b/pkg/statestore/registry.go index aca20c7..8828847 100644 --- a/pkg/statestore/registry.go +++ b/pkg/statestore/registry.go @@ -20,7 +20,7 @@ package statestore import ( "sync" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" ) // Registry manages multiple key-value stores. @@ -67,7 +67,9 @@ func (r *Registry) Get(name string) (*Store, error) { } shared = newSharedStore(r, name, backend) - defer shared.Release() + defer func() { + _ = shared.Release() + }() r.active[name] = shared r.wg.Add(1) diff --git a/pkg/statestore/store.go b/pkg/statestore/store.go index a776efa..8a270ca 100644 --- a/pkg/statestore/store.go +++ b/pkg/statestore/store.go @@ -18,7 +18,7 @@ package statestore import ( - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" "github.com/elastic/go-concert/atomic" "github.com/elastic/go-concert/unison" ) diff --git a/pkg/statestore/store_test.go b/pkg/statestore/store_test.go index 7a7833b..8ce4312 100644 --- a/pkg/statestore/store_test.go +++ b/pkg/statestore/store_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/storetest" ) func TestStore_Close(t *testing.T) { diff --git a/pkg/statestore/storetest/storetest.go b/pkg/statestore/storetest/storetest.go index cd6d07d..cc2252c 100644 --- a/pkg/statestore/storetest/storetest.go +++ b/pkg/statestore/storetest/storetest.go @@ -22,8 +22,8 @@ import ( "errors" "sync" - "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" - "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" + "github.com/elastic/elastic-agent-libs/transform/typeconv" ) // MemoryStore provides a dummy backend store that holds all access stores and @@ -140,7 +140,7 @@ func (s *MapStore) Has(key string) (bool, error) { } // Get returns a key value pair from the store. An error is returned if the -// store has been closed, the key is unknown, or an decoding error occured. +// store has been closed, the key is unknown, or an decoding error occurred. func (s *MapStore) Get(key string, into interface{}) error { s.mu.RLock() defer s.mu.RUnlock() diff --git a/pkg/statestore/storetest/storetest_test.go b/pkg/statestore/storetest/storetest_test.go index 129c386..3d981b9 100644 --- a/pkg/statestore/storetest/storetest_test.go +++ b/pkg/statestore/storetest/storetest_test.go @@ -22,13 +22,16 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/statestore/backend" - "github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/backend" + "github.com/elastic/elastic-agent-inputs/pkg/statestore/internal/storecompliance" "github.com/elastic/elastic-agent-libs/logp" ) func init() { - logp.DevelopmentSetup() + err := logp.DevelopmentSetup() + if err != nil { + panic(err) + } } func TestCompliance(t *testing.T) {