-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
(cherry picked from commit e689fd1)
- Loading branch information
Steffen Siering
authored
Jul 8, 2020
1 parent
be0aa74
commit 80735bb
Showing
2 changed files
with
295 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package stateless | ||
|
||
import ( | ||
"fmt" | ||
"runtime/debug" | ||
|
||
"github.com/elastic/go-concert/unison" | ||
|
||
v2 "github.com/elastic/beats/v7/filebeat/input/v2" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
) | ||
|
||
// InputManager provides an InputManager for transient inputs, that do not store | ||
// state in the registry or require end-to-end event acknowledgement. | ||
type InputManager struct { | ||
Configure func(*common.Config) (Input, error) | ||
} | ||
|
||
// Input is the interface transient inputs are required to implemented. | ||
type Input interface { | ||
Name() string | ||
Test(v2.TestContext) error | ||
Run(ctx v2.Context, publish Publisher) error | ||
} | ||
|
||
// Publisher is used by the Input to emit events. | ||
type Publisher interface { | ||
Publish(beat.Event) | ||
} | ||
|
||
type configuredInput struct { | ||
input Input | ||
} | ||
|
||
var _ v2.InputManager = InputManager{} | ||
|
||
// NewInputManager wraps the given configure function to create a new stateless input manager. | ||
func NewInputManager(configure func(*common.Config) (Input, error)) InputManager { | ||
return InputManager{Configure: configure} | ||
} | ||
|
||
// Init does nothing. Init is required to fullfil the v2.InputManager interface. | ||
func (m InputManager) Init(_ unison.Group, _ v2.Mode) error { return nil } | ||
|
||
// Create configures a transient input and ensures that the final input can be used with | ||
// with the filebeat input architecture. | ||
func (m InputManager) Create(cfg *common.Config) (v2.Input, error) { | ||
inp, err := m.Configure(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return configuredInput{inp}, nil | ||
} | ||
|
||
func (si configuredInput) Name() string { return si.input.Name() } | ||
|
||
func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (err error) { | ||
defer func() { | ||
if v := recover(); v != nil { | ||
if e, ok := v.(error); ok { | ||
err = e | ||
} else { | ||
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack()) | ||
} | ||
} | ||
}() | ||
|
||
client, err := pipeline.ConnectWith(beat.ClientConfig{ | ||
PublishMode: beat.DefaultGuarantees, | ||
|
||
// configure pipeline to disconnect input on stop signal. | ||
CloseRef: ctx.Cancelation, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
defer client.Close() | ||
return si.input.Run(ctx, client) | ||
} | ||
|
||
func (si configuredInput) Test(ctx v2.TestContext) error { | ||
return si.input.Test(ctx) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package stateless_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"runtime" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
v2 "github.com/elastic/beats/v7/filebeat/input/v2" | ||
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/atomic" | ||
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" | ||
) | ||
|
||
type fakeStatelessInput struct { | ||
OnTest func(v2.TestContext) error | ||
OnRun func(v2.Context, stateless.Publisher) error | ||
} | ||
|
||
func TestStateless_Run(t *testing.T) { | ||
t.Run("events are published", func(t *testing.T) { | ||
const numEvents = 5 | ||
|
||
ch := make(chan beat.Event) | ||
connector := pubtest.ConstClient(pubtest.ChClient(ch)) | ||
|
||
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ | ||
OnRun: func(ctx v2.Context, publisher stateless.Publisher) error { | ||
defer close(ch) | ||
for i := 0; i < numEvents; i++ { | ||
publisher.Publish(beat.Event{Fields: map[string]interface{}{"id": i}}) | ||
} | ||
return nil | ||
}, | ||
}), nil) | ||
|
||
var err error | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
err = input.Run(v2.Context{}, connector) | ||
}() | ||
|
||
var receivedEvents int | ||
for range ch { | ||
receivedEvents++ | ||
} | ||
wg.Wait() | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, numEvents, receivedEvents) | ||
}) | ||
|
||
t.Run("capture panic and return error", func(t *testing.T) { | ||
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ | ||
OnRun: func(_ v2.Context, _ stateless.Publisher) error { | ||
panic("oops") | ||
}, | ||
}), nil) | ||
|
||
var clientCounters pubtest.ClientCounter | ||
err := input.Run(v2.Context{}, clientCounters.BuildConnector()) | ||
|
||
require.Error(t, err) | ||
require.Equal(t, 1, clientCounters.Total()) | ||
require.Equal(t, 0, clientCounters.Active()) | ||
}) | ||
|
||
t.Run("publisher unblocks if shutdown signal is send", func(t *testing.T) { | ||
// the input blocks in the publisher. We loop until the shutdown signal is received | ||
var started atomic.Bool | ||
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ | ||
OnRun: func(ctx v2.Context, publisher stateless.Publisher) error { | ||
for ctx.Cancelation.Err() == nil { | ||
started.Store(true) | ||
publisher.Publish(beat.Event{ | ||
Fields: common.MapStr{ | ||
"hello": "world", | ||
}, | ||
}) | ||
} | ||
return ctx.Cancelation.Err() | ||
}, | ||
}), nil) | ||
|
||
// connector creates a client the blocks forever until the shutdown signal is received | ||
var publishCalls atomic.Int | ||
connector := pubtest.FakeConnector{ | ||
ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) { | ||
return &pubtest.FakeClient{ | ||
PublishFunc: func(event beat.Event) { | ||
publishCalls.Inc() | ||
<-config.CloseRef.Done() | ||
}, | ||
}, nil | ||
}, | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
var wg sync.WaitGroup | ||
var err error | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
err = input.Run(v2.Context{Cancelation: ctx}, connector) | ||
}() | ||
|
||
// signal and wait for shutdown | ||
for !started.Load() { | ||
runtime.Gosched() | ||
} | ||
cancel() | ||
wg.Wait() | ||
|
||
// validate | ||
require.Equal(t, context.Canceled, err) | ||
require.Equal(t, 1, publishCalls.Load()) | ||
}) | ||
|
||
t.Run("do not start input of pipeline connection fails", func(t *testing.T) { | ||
errOpps := errors.New("oops") | ||
connector := pubtest.FailingConnector(errOpps) | ||
|
||
var run atomic.Int | ||
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ | ||
OnRun: func(_ v2.Context, publisher stateless.Publisher) error { | ||
run.Inc() | ||
return nil | ||
}, | ||
}), nil) | ||
|
||
err := input.Run(v2.Context{}, connector) | ||
require.True(t, errors.Is(err, errOpps)) | ||
require.Equal(t, 0, run.Load()) | ||
}) | ||
} | ||
|
||
func (f *fakeStatelessInput) Name() string { return "test" } | ||
|
||
func (f *fakeStatelessInput) Test(ctx v2.TestContext) error { | ||
if f.OnTest != nil { | ||
return f.OnTest(ctx) | ||
} | ||
return nil | ||
} | ||
|
||
func (f *fakeStatelessInput) Run(ctx v2.Context, publish stateless.Publisher) error { | ||
if f.OnRun != nil { | ||
return f.OnRun(ctx, publish) | ||
} | ||
return errors.New("oops, run not implemented") | ||
} | ||
|
||
func createConfiguredInput(t *testing.T, manager stateless.InputManager, config map[string]interface{}) v2.Input { | ||
input, err := manager.Create(common.MustNewConfigFrom(config)) | ||
require.NoError(t, err) | ||
return input | ||
} | ||
|
||
func constInputManager(input stateless.Input) stateless.InputManager { | ||
return stateless.NewInputManager(constInput(input)) | ||
} | ||
|
||
func constInput(input stateless.Input) func(*common.Config) (stateless.Input, error) { | ||
return func(_ *common.Config) (stateless.Input, error) { | ||
return input, nil | ||
} | ||
} |