Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input v2 stateless manager #19406

Merged
merged 2 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package stateless

import (
"fmt"
"runtime/debug"

"github.com/elastic/go-concert/unison"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

// InputManager provides an InputManager for transient inputs, that do not store
// state in the registry or require end-to-end event acknowledgement.
type InputManager struct {
Configure func(*common.Config) (Input, error)
}

// Input is the interface transient inputs are required to implemented.
type Input interface {
Name() string
Test(v2.TestContext) error
Run(ctx v2.Context, publish Publisher) error
}

// Publisher is used by the Input to emit events.
type Publisher interface {
Publish(beat.Event)
}

type configuredInput struct {
input Input
}

var _ v2.InputManager = InputManager{}

// NewInputManager wraps the given configure function to create a new stateless input manager.
func NewInputManager(configure func(*common.Config) (Input, error)) InputManager {
return InputManager{Configure: configure}
}

// Init does nothing. Init is required to fullfil the v2.InputManager interface.
func (m InputManager) Init(_ unison.Group, _ v2.Mode) error { return nil }

// Create configures a transient input and ensures that the final input can be used with
// with the filebeat input architecture.
func (m InputManager) Create(cfg *common.Config) (v2.Input, error) {
inp, err := m.Configure(cfg)
if err != nil {
return nil, err
}
return configuredInput{inp}, nil
}

func (si configuredInput) Name() string { return si.input.Name() }

func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (err error) {
defer func() {
if v := recover(); v != nil {
if e, ok := v.(error); ok {
err = e
} else {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
}
}
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.DefaultGuarantees,

// configure pipeline to disconnect input on stop signal.
CloseRef: ctx.Cancelation,
})
if err != nil {
return err
}

defer client.Close()
return si.input.Run(ctx, client)
}

func (si configuredInput) Test(ctx v2.TestContext) error {
return si.input.Test(ctx)
}
193 changes: 193 additions & 0 deletions filebeat/input/v2/input-stateless/stateless_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package stateless_test

import (
"context"
"errors"
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
)

type fakeStatelessInput struct {
OnTest func(v2.TestContext) error
OnRun func(v2.Context, stateless.Publisher) error
}

func TestStateless_Run(t *testing.T) {
t.Run("events are published", func(t *testing.T) {
const numEvents = 5

ch := make(chan beat.Event)
connector := pubtest.ConstClient(pubtest.ChClient(ch))

input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{
OnRun: func(ctx v2.Context, publisher stateless.Publisher) error {
defer close(ch)
for i := 0; i < numEvents; i++ {
publisher.Publish(beat.Event{Fields: map[string]interface{}{"id": i}})
}
return nil
},
}), nil)

var err error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err = input.Run(v2.Context{}, connector)
}()

var receivedEvents int
for range ch {
receivedEvents++
}
wg.Wait()

require.NoError(t, err)
require.Equal(t, numEvents, receivedEvents)
})

t.Run("capture panic and return error", func(t *testing.T) {
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{
OnRun: func(_ v2.Context, _ stateless.Publisher) error {
panic("oops")
},
}), nil)

var clientCounters pubtest.ClientCounter
err := input.Run(v2.Context{}, clientCounters.BuildConnector())

require.Error(t, err)
require.Equal(t, 1, clientCounters.Total())
require.Equal(t, 0, clientCounters.Active())
})

t.Run("publisher unblocks if shutdown signal is send", func(t *testing.T) {
// the input blocks in the publisher. We loop until the shutdown signal is received
var started atomic.Bool
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{
OnRun: func(ctx v2.Context, publisher stateless.Publisher) error {
for ctx.Cancelation.Err() == nil {
started.Store(true)
publisher.Publish(beat.Event{
Fields: common.MapStr{
"hello": "world",
},
})
}
return ctx.Cancelation.Err()
},
}), nil)

// connector creates a client the blocks forever until the shutdown signal is received
var publishCalls atomic.Int
connector := pubtest.FakeConnector{
ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) {
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
publishCalls.Inc()
<-config.CloseRef.Done()
},
}, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
var err error
wg.Add(1)
go func() {
defer wg.Done()
err = input.Run(v2.Context{Cancelation: ctx}, connector)
}()

// signal and wait for shutdown
for !started.Load() {
runtime.Gosched()
}
cancel()
wg.Wait()

// validate
require.Equal(t, context.Canceled, err)
require.Equal(t, 1, publishCalls.Load())
})

t.Run("do not start input of pipeline connection fails", func(t *testing.T) {
errOpps := errors.New("oops")
connector := pubtest.FailingConnector(errOpps)

var run atomic.Int
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{
OnRun: func(_ v2.Context, publisher stateless.Publisher) error {
run.Inc()
return nil
},
}), nil)

err := input.Run(v2.Context{}, connector)
require.True(t, errors.Is(err, errOpps))
require.Equal(t, 0, run.Load())
})
}

func (f *fakeStatelessInput) Name() string { return "test" }

func (f *fakeStatelessInput) Test(ctx v2.TestContext) error {
if f.OnTest != nil {
return f.OnTest(ctx)
}
return nil
}

func (f *fakeStatelessInput) Run(ctx v2.Context, publish stateless.Publisher) error {
if f.OnRun != nil {
return f.OnRun(ctx, publish)
}
return errors.New("oops, run not implemented")
}

func createConfiguredInput(t *testing.T, manager stateless.InputManager, config map[string]interface{}) v2.Input {
input, err := manager.Create(common.MustNewConfigFrom(config))
require.NoError(t, err)
return input
}

func constInputManager(input stateless.Input) stateless.InputManager {
return stateless.NewInputManager(constInput(input))
}

func constInput(input stateless.Input) func(*common.Config) (stateless.Input, error) {
return func(_ *common.Config) (stateless.Input, error) {
return input, nil
}
}