Skip to content
This repository has been archived by the owner on Sep 20, 2023. It is now read-only.

Commit

Permalink
fixes after moving things around
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Jun 1, 2022
1 parent 26879d7 commit 0a79947
Show file tree
Hide file tree
Showing 52 changed files with 445 additions and 387 deletions.
4 changes: 2 additions & 2 deletions pkg/feature/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Featurable interface {
// of the method is type checked by the 'FindFactory' of each namespace.
Factory() interface{}

// Description return the avaiable information for a specific feature.
// Description return the available information for a specific feature.
Description() Details

String() string
Expand Down Expand Up @@ -71,7 +71,7 @@ func (f *Feature) Factory() interface{} {
return f.factory
}

// Description return the avaiable information for a specific feature.
// Description return the available information for a specific feature.
func (f *Feature) Description() Details {
return f.description
}
Expand Down
47 changes: 30 additions & 17 deletions pkg/feature/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRegister(t *testing.T) {

t.Run("namespace exists and feature doesn't exist", func(t *testing.T) {
r := NewRegistry()
r.Register(New("processor", "bar", f, defaultDetails))
mustRegister(r, New("processor", "bar", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
if !assert.NoError(t, err) {
return
Expand All @@ -59,7 +59,7 @@ func TestRegister(t *testing.T) {

t.Run("namespace exists and feature exists and not the same factory", func(t *testing.T) {
r := NewRegistry()
r.Register(New("processor", "foo", func() {}, defaultDetails))
mustRegister(r, New("processor", "foo", func() {}, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
if !assert.Error(t, err) {
return
Expand All @@ -70,7 +70,7 @@ func TestRegister(t *testing.T) {
t.Run("when the exact feature is already registered", func(t *testing.T) {
feature := New("processor", "foo", f, defaultDetails)
r := NewRegistry()
r.Register(feature)
mustRegister(r, feature)
err := r.Register(feature)
if !assert.NoError(t, err) {
return
Expand All @@ -79,12 +79,19 @@ func TestRegister(t *testing.T) {
})
}

func mustRegister(r *Registry, f Featurable) {
err := r.Register(f)
if err != nil {
panic(err)
}
}

func TestFeature(t *testing.T) {
f := func() {}

r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
r.Register(New("HOLA", "fOO", f, defaultDetails))
mustRegister(r, New("processor", "foo", f, defaultDetails))
mustRegister(r, New("HOLA", "fOO", f, defaultDetails))

t.Run("when namespace and feature are present", func(t *testing.T) {
feature, err := r.Lookup("processor", "foo")
Expand Down Expand Up @@ -113,9 +120,9 @@ func TestLookup(t *testing.T) {
f := func() {}

r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
r.Register(New("processor", "foo2", f, defaultDetails))
r.Register(New("HELLO", "fOO", f, defaultDetails))
mustRegister(r, New("processor", "foo", f, defaultDetails))
mustRegister(r, New("processor", "foo2", f, defaultDetails))
mustRegister(r, New("HELLO", "fOO", f, defaultDetails))

t.Run("when namespace and feature are present", func(t *testing.T) {
features, err := r.LookupAll("processor")
Expand Down Expand Up @@ -147,9 +154,10 @@ func TestUnregister(t *testing.T) {

t.Run("when the namespace and the feature exists", func(t *testing.T) {
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())
err := r.Unregister("processor", "foo")
err = r.Unregister("processor", "foo")
if !assert.NoError(t, err) {
return
}
Expand All @@ -158,9 +166,10 @@ func TestUnregister(t *testing.T) {

t.Run("when the namespace exist and the feature doesn't", func(t *testing.T) {
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())
err := r.Unregister("processor", "bar")
err = r.Unregister("processor", "bar")
if assert.Error(t, err) {
return
}
Expand All @@ -169,9 +178,10 @@ func TestUnregister(t *testing.T) {

t.Run("when the namespace doesn't exists", func(t *testing.T) {
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())
err := r.Unregister("outputs", "bar")
err = r.Unregister("outputs", "bar")
if assert.Error(t, err) {
return
}
Expand All @@ -184,18 +194,21 @@ func TestOverwrite(t *testing.T) {
f := func() {}
r := NewRegistry()
assert.Equal(t, 0, r.Size())
r.Overwrite(New("processor", "foo", f, defaultDetails))
err := r.Overwrite(New("processor", "foo", f, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())
})

t.Run("overwrite when the feature exists", func(t *testing.T) {
f := func() {}
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())

check := 42
r.Overwrite(New("processor", "foo", check, defaultDetails))
err = r.Overwrite(New("processor", "foo", check, defaultDetails))
assert.NoError(t, err)
assert.Equal(t, 1, r.Size())

feature, err := r.Lookup("processor", "foo")
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/input/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package v2
package input

import (
"errors"
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/input/input-cursor/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type cleaner struct {
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
started := time.Now()
timed.Periodic(canceler, interval, func() error {
_ = timed.Periodic(canceler, interval, func() error {
gcStore(c.log, started, store)
return nil
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/manager/input/input-cursor/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestGCStore(t *testing.T) {
started := time.Now()

backend := createSampleStore(t, nil)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
Expand All @@ -52,7 +52,7 @@ func TestGCStore(t *testing.T) {
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
Expand All @@ -72,7 +72,7 @@ func TestGCStore(t *testing.T) {
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
Expand All @@ -93,7 +93,7 @@ func TestGCStore(t *testing.T) {
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
Expand All @@ -113,7 +113,7 @@ func TestGCStore(t *testing.T) {
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

// access resource and check it is not gc'ed
Expand All @@ -140,7 +140,7 @@ func TestGCStore(t *testing.T) {
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
store := testOpenStore(t, backend)
defer store.Release()

// create pending update operation
Expand Down
16 changes: 8 additions & 8 deletions pkg/manager/input/input-cursor/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (

func TestCursor_IsNew(t *testing.T) {
t.Run("true if key is not in store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
store := testOpenStore(t, createSampleStore(t, nil))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
require.True(t, cursor.IsNew())
})

t.Run("true if key is in store but without cursor value", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: nil},
}))
defer store.Release()
Expand All @@ -43,7 +43,7 @@ func TestCursor_IsNew(t *testing.T) {
})

t.Run("false if key with cursor value is in persistent store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()
Expand All @@ -53,7 +53,7 @@ func TestCursor_IsNew(t *testing.T) {
})

t.Run("false if key with cursor value is in memory store only", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: nil},
}))
defer store.Release()
Expand All @@ -70,7 +70,7 @@ func TestCursor_IsNew(t *testing.T) {

func TestCursor_Unpack(t *testing.T) {
t.Run("nothing to unpack if key is new", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
store := testOpenStore(t, createSampleStore(t, nil))
defer store.Release()

var st string
Expand All @@ -81,7 +81,7 @@ func TestCursor_Unpack(t *testing.T) {
})

t.Run("unpack fails if types are not compatible", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()
Expand All @@ -92,7 +92,7 @@ func TestCursor_Unpack(t *testing.T) {
})

t.Run("unpack from state in persistent store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()
Expand All @@ -105,7 +105,7 @@ func TestCursor_Unpack(t *testing.T) {
})

t.Run("unpack from in memory state if updates are pending", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
store := testOpenStore(t, createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()
Expand Down
21 changes: 10 additions & 11 deletions pkg/manager/input/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import (
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-inputs/pkg/manager/input"
"github.com/elastic/elastic-agent-inputs/pkg/publisher"
"github.com/elastic/elastic-agent-inputs/pkg/publisher/acker"
)

// Input interface for cursor based inputs. This interface must be implemented
Expand Down Expand Up @@ -102,7 +101,7 @@ func (inp *managedInput) testSource(ctx input.TestContext, source Source) (err e
// issue, but not crash the whole process.
func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
pipeline publisher.PipelineConnector,
) (err error) {
// Setup cancellation using a custom cancel context. All workers will be
// stopped if one failed badly by returning an error.
Expand Down Expand Up @@ -136,7 +135,7 @@ func (inp *managedInput) runSource(
ctx input.Context,
store *store,
source Source,
pipeline beat.PipelineConnector,
pipeline publisher.PipelineConnector,
) (err error) {
defer func() {
if v := recover(); v != nil {
Expand All @@ -145,9 +144,9 @@ func (inp *managedInput) runSource(
}
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
client, err := pipeline.ConnectWith(publisher.ClientConfig{
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
ACKHandler: newInputACKHandler(),
})
if err != nil {
return err
Expand All @@ -164,8 +163,8 @@ func (inp *managedInput) runSource(
store.UpdateTTL(resource, inp.cleanTimeout)

cursor := makeCursor(store, resource)
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
return inp.input.Run(ctx, source, cursor, publisher)
p := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
return inp.input.Run(ctx, source, cursor, p)
}

func (inp *managedInput) createSourceID(s Source) string {
Expand All @@ -175,7 +174,7 @@ func (inp *managedInput) createSourceID(s Source) string {
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) beat.ACKer {
func newInputACKHandler() publisher.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
Expand Down
Loading

0 comments on commit 0a79947

Please sign in to comment.