Skip to content

Commit

Permalink
Input v2 compatibility layer (elastic#19401)
Browse files Browse the repository at this point in the history
(cherry picked from commit c9a9bf5)
  • Loading branch information
Steffen Siering committed Jul 8, 2020
1 parent 9c79938 commit 10b9d3d
Show file tree
Hide file tree
Showing 4 changed files with 552 additions and 0 deletions.
153 changes: 153 additions & 0 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package compat provides helpers for integrating the input/v2 API with
// existing input based features like autodiscovery, config file reloading, or
// filebeat modules.
package compat

import (
"fmt"
"sync"

"github.com/mitchellh/hashstructure"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert"
)

// factory implements the cfgfile.RunnerFactory interface and wraps the
// v2.Loader to create cfgfile.Runner instances based on available v2 inputs.
type factory struct {
log *logp.Logger
info beat.Info
loader *v2.Loader
}

// runner wraps a v2.Input, starting a go-routine
// On start the runner spawns a go-routine that will call (v2.Input).Run with
// the `sig` setup for shutdown signaling.
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig *concert.OnceSignaler
input v2.Input
connector beat.PipelineConnector
}

// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
// compatible with config file based input reloading, autodiscovery, and filebeat modules.
// The RunnerFactory is can be used to integrate v2 inputs into existing Beats.
func RunnerFactory(
log *logp.Logger,
info beat.Info,
loader *v2.Loader,
) cfgfile.RunnerFactory {
return &factory{log: log, info: info, loader: loader}
}

func (f *factory) CheckConfig(cfg *common.Config) error {
_, err := f.loader.Configure(cfg)
if err != nil {
return err
}
return nil
}

func (f *factory) Create(
p beat.PipelineConnector,
config *common.Config,
) (cfgfile.Runner, error) {
input, err := f.loader.Configure(config)
if err != nil {
return nil, err
}

id, err := configID(config)
if err != nil {
return nil, err
}

return &runner{
id: id,
log: f.log.Named(input.Name()),
agent: &f.info,
sig: concert.NewOnceSignaler(),
input: input,
connector: p,
}, nil
}

func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
log := r.log
name := r.input.Name()

go func() {
log.Infof("Input %v starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
},
r.connector,
)
if err != nil {
log.Errorf("Input '%v' failed with: %+v", name, err)
} else {
log.Infof("Input '%v' stopped", name)
}
}()
}

func (r *runner) Stop() {
r.sig.Trigger()
r.wg.Wait()
r.log.Infof("Input '%v' stopped", r.input.Name())
}

func configID(config *common.Config) (string, error) {
tmp := struct {
ID string `config:"id"`
}{}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute id from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}
120 changes: 120 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package compat

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/internal/inputest"
)

func TestRunnerFactory_CheckConfig(t *testing.T) {
t.Run("does not run or test configured input", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(_ *common.Config) (v2.Input, error) {
countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error { countRun++; return nil },
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

// run
err := factory.CheckConfig(common.NewConfig())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// validate: configured an input, but do not run test or run
assert.Equal(t, 1, countConfigure)
assert.Equal(t, 0, countTest)
assert.Equal(t, 0, countRun)
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
loader := inputest.MustNewTestLoader(t, plugins, "type", "")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

// run
err := factory.CheckConfig(common.MustNewConfigFrom(map[string]interface{}{
"type": "unknown",
}))
assert.Error(t, err)
})
}

func TestRunnerFactory_CreateAndRun(t *testing.T) {
t.Run("runner can correctly start and stop inputs", func(t *testing.T) {
log := logp.NewLogger("test")
var countRun int
var wg sync.WaitGroup
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(&inputest.MockInput{
OnRun: func(ctx v2.Context, _ beat.PipelineConnector) error {
defer wg.Done()
countRun++
<-ctx.Cancelation.Done()
return nil
},
}))
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

runner, err := factory.Create(nil, common.MustNewConfigFrom(map[string]interface{}{
"type": "test",
}))
require.NoError(t, err)

wg.Add(1)
runner.Start()
runner.Stop()
wg.Wait()
assert.Equal(t, 1, countRun)
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
loader := inputest.MustNewTestLoader(t, plugins, "type", "")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

// run
runner, err := factory.Create(nil, common.MustNewConfigFrom(map[string]interface{}{
"type": "unknown",
}))
assert.Nil(t, runner)
assert.Error(t, err)
})
}
77 changes: 77 additions & 0 deletions filebeat/input/v2/compat/composed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package compat

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
)

// composeFactory combines to factories. Instances are created using the Combine function.
// For each operation the configured factory will be tried first. If the
// operation failed (for example the input type is unknown) the fallback factory is tried.
type composeFactory struct {
factory cfgfile.RunnerFactory
fallback cfgfile.RunnerFactory
}

// Combine takes two RunnerFactory instances and creates a new RunnerFactory.
// The new factory will first try to create an input using factory. If this operation fails fallback will be used.
//
// The new RunnerFactory will return the error of fallback only if factory did
// signal that the input type is unknown via v2.ErrUnknown.
//
// XXX: This RunnerFactory is used for combining the v2.Loader with the
// existing RunnerFactory for inputs in Filebeat. The Combine function should be removed once the old RunnerFactory is removed.
func Combine(factory, fallback cfgfile.RunnerFactory) cfgfile.RunnerFactory {
return composeFactory{factory: factory, fallback: fallback}
}

func (f composeFactory) CheckConfig(cfg *common.Config) error {
err := f.factory.CheckConfig(cfg)
if !v2.IsUnknownInputError(err) {
return err
}
return f.fallback.CheckConfig(cfg)
}

func (f composeFactory) Create(
p beat.PipelineConnector,
config *common.Config,
) (cfgfile.Runner, error) {
var runner cfgfile.Runner
var err1, err2 error

runner, err1 = f.factory.Create(p, config)
if err1 == nil {
return runner, nil
}

runner, err2 = f.fallback.Create(p, config)
if err2 == nil {
return runner, nil
}

// return err2 only if err1 indicates that the input type is not known to f.factory
if v2.IsUnknownInputError(err1) {
return nil, err2
}
return nil, err1
}
Loading

0 comments on commit 10b9d3d

Please sign in to comment.