Skip to content

Commit

Permalink
Add publisher implementation for stateful inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 1, 2020
1 parent a9f55c4 commit 5367a39
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 4 deletions.
53 changes: 49 additions & 4 deletions filebeat/input/v2/input-cursor/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/statestore"
)

// Publisher is used to publish an event and update the cursor in a single call to Publish.
Expand Down Expand Up @@ -64,12 +65,25 @@ type updateOp struct {
// The ACK ordering in the publisher pipeline guarantees that update operations
// will be ACKed and executed in the correct order.
func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) error {
panic("TODO: implement me")
if cursorUpdate == nil {
return c.forward(event)
}

op, err := createUpdateOp(c.cursor.store, c.cursor.resource, cursorUpdate)
if err != nil {
return err
}

event.Private = op
return c.forward(event)
}

// Execute updates the persistent store with the scheduled changes and releases the resource.
func (op *updateOp) Execute(numEvents uint) {
panic("TODO: implement me")
func (c *cursorPublisher) forward(event beat.Event) error {
c.client.Publish(event)
if c.canceler == nil {
return nil
}
return c.canceler.Err()
}

func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
Expand Down Expand Up @@ -106,3 +120,34 @@ func (op *updateOp) done(n uint) {
op.resource = nil
*op = updateOp{}
}

// Execute updates the persistent store with the scheduled changes and releases the resource.
func (op *updateOp) Execute(n uint) {
resource := op.resource
defer op.done(n)

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

resource.activeCursorOperations -= n
if resource.activeCursorOperations == 0 {
resource.cursor = resource.pendingCursor
resource.pendingCursor = nil
} else {
typeconv.Convert(&resource.cursor, op.delta)
}

if resource.internalState.Updated.Before(op.timestamp) {
resource.internalState.Updated = op.timestamp
}

err := op.store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
if err != nil {
if !statestore.IsClosed(err) {
op.store.log.Errorf("Failed to update state in the registry for '%v'", resource.key)
}
} else {
resource.internalInSync = true
resource.stored = true
}
}
158 changes: 158 additions & 0 deletions filebeat/input/v2/input-cursor/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
)

func TestPublish(t *testing.T) {
t.Run("event with cursor state creates update operation", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
cursor := makeCursor(store, store.Get("test::key"))

var actual beat.Event
client := &pubtest.FakeClient{
PublishFunc: func(event beat.Event) { actual = event },
}
publisher := cursorPublisher{nil, client, &cursor}
publisher.Publish(beat.Event{}, "test")

require.NotNil(t, actual.Private)
})

t.Run("event without cursor creates no update operation", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
cursor := makeCursor(store, store.Get("test::key"))

var actual beat.Event
client := &pubtest.FakeClient{
PublishFunc: func(event beat.Event) { actual = event },
}
publisher := cursorPublisher{nil, client, &cursor}
publisher.Publish(beat.Event{}, nil)
require.Nil(t, actual.Private)
})

t.Run("publish returns error if context has been cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
cancel()

store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
cursor := makeCursor(store, store.Get("test::key"))

publisher := cursorPublisher{ctx, &pubtest.FakeClient{}, &cursor}
err := publisher.Publish(beat.Event{}, nil)
require.Equal(t, context.Canceled, err)
})
}

func TestOp_Execute(t *testing.T) {
t.Run("applying final op marks the key as finished", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
res := store.Get("test::key")

// create op and release resource. The 'resource' must still be active
op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state")
res.Release()
require.False(t, res.Finished())

// this was the last op, the resource should become inactive
op.Execute(1)
require.True(t, res.Finished())

// validate state:
inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor
inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor
want := "test-updated-cursor-state"
assert.Equal(t, want, inSyncCursor)
assert.Equal(t, want, inMemCursor)
})

t.Run("acking multiple ops applies the latest update and marks key as finished", func(t *testing.T) {
// when acking N events, intermediate updates are dropped in favor of the latest update operation.
// This test checks that the resource is correctly marked as finished.

store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
res := store.Get("test::key")

// create update operations and release resource. The 'resource' must still be active
mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-dropped")
op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final")
res.Release()
require.False(t, res.Finished())

// this was the last op, the resource should become inactive
op.Execute(2)
require.True(t, res.Finished())

// validate state:
inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor
inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor
want := "test-updated-cursor-state-final"
assert.Equal(t, want, inSyncCursor)
assert.Equal(t, want, inMemCursor)
})

t.Run("ACK only subset of pending ops will only update up to ACKed state", func(t *testing.T) {
// when acking N events, intermediate updates are dropped in favor of the latest update operation.
// This test checks that the resource is correctly marked as finished.

store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()
res := store.Get("test::key")

// create update operations and release resource. The 'resource' must still be active
op1 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-intermediate")
op2 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final")
res.Release()
require.False(t, res.Finished())

defer op2.done(1) // cleanup after test

// this was the intermediate op, the resource should still be active
op1.Execute(1)
require.False(t, res.Finished())

// validate state (in memory state is always up to data to most recent update):
inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor
inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor
assert.Equal(t, "test-updated-cursor-state-intermediate", inSyncCursor)
assert.Equal(t, "test-updated-cursor-state-final", inMemCursor)
})
}

func mustCreateUpdateOp(t *testing.T, store *store, resource *resource, updates interface{}) *updateOp {
op, err := createUpdateOp(store, resource, updates)
if err != nil {
t.Fatalf("Failed to create update op: %v", err)
}
return op
}

0 comments on commit 5367a39

Please sign in to comment.