-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change adds the package filebeat/input/v2. It introduces the interfaces for the v2 input API only. The integration with filebat, actual InputManagers and input implementations will follow. This is the first PR introducing the new API. The current state of the full implementation can be seen [here](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2) and [sample inputs based on the new API](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/features/input). The full list of changes will include: - Introduce v2 API interfaces - Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality - Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs. - Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts. - Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat. This PR only introduces the v2 API interfaces.
- Loading branch information
Steffen Siering
authored
Jun 18, 2020
1 parent
a9d457b
commit 27e28c8
Showing
9 changed files
with
900 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package v2 | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"strings" | ||
) | ||
|
||
// LoadError is returned by Loaders in case of failures. | ||
type LoadError struct { | ||
// Name of input/module that failed to load (if applicable) | ||
Name string | ||
|
||
// Reason why the loader failed. Can either be the cause reported by the | ||
// Plugin or some other indicator like ErrUnknown | ||
Reason error | ||
|
||
// (optional) Message to report in additon. | ||
Message string | ||
} | ||
|
||
// SetupError indicates that the loader initialization has detected | ||
// errors in individual plugin configurations or duplicates. | ||
type SetupError struct { | ||
Fails []error | ||
} | ||
|
||
// ErrUnknownInput indicates that the plugin type does not exist. Either | ||
// because the 'type' setting name does not match the loaders expectations, | ||
// or because the type is unknown. | ||
var ErrUnknownInput = errors.New("unknown input type") | ||
|
||
// ErrNoInputConfigured indicates that the 'type' setting is missing. | ||
var ErrNoInputConfigured = errors.New("no input type configured") | ||
|
||
// ErrPluginWithoutName reports that the operation failed because | ||
// the plugin is required to have a Name. | ||
var ErrPluginWithoutName = errors.New("the plugin has no name") | ||
|
||
// IsUnknownInputError checks if an error value indicates an input load | ||
// error because there is no existing plugin that can create the input. | ||
func IsUnknownInputError(err error) bool { return errors.Is(err, ErrUnknownInput) } | ||
|
||
func failedInputName(err error) string { | ||
switch e := err.(type) { | ||
case *LoadError: | ||
return e.Name | ||
default: | ||
return "" | ||
} | ||
} | ||
|
||
// Unwrap returns the reason if present | ||
func (e *LoadError) Unwrap() error { return e.Reason } | ||
|
||
// Error returns the errors string repesentation | ||
func (e *LoadError) Error() string { | ||
var buf strings.Builder | ||
|
||
if e.Message != "" { | ||
buf.WriteString(e.Message) | ||
} else if e.Name != "" { | ||
buf.WriteString("failed to load ") | ||
buf.WriteString(e.Name) | ||
} | ||
|
||
if e.Reason != nil { | ||
if buf.Len() > 0 { | ||
buf.WriteString(": ") | ||
} | ||
fmt.Fprintf(&buf, "%v", e.Reason) | ||
} | ||
|
||
if buf.Len() == 0 { | ||
return "<loader error>" | ||
} | ||
return buf.String() | ||
} | ||
|
||
// Error returns the errors string repesentation | ||
func (e *SetupError) Error() string { | ||
var buf strings.Builder | ||
buf.WriteString("invalid plugin setup found:") | ||
for _, err := range e.Fails { | ||
fmt.Fprintf(&buf, "\n\t%v", err) | ||
} | ||
return buf.String() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package v2 | ||
|
||
import ( | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
|
||
"github.com/elastic/go-concert/unison" | ||
) | ||
|
||
// InputManager creates and maintains actions and background processes for an | ||
// input type. | ||
// The InputManager is used to create inputs. The InputManager can provide | ||
// additional functionality like coordination between input of the same type, | ||
// custom functionality for querying or caching shared information, application | ||
// of common settings not unique to a particular input type, or require a more | ||
// specific Input interface to be implemented by the actual input. | ||
type InputManager interface { | ||
// Init signals to InputManager to initialize internal resources. | ||
// The mode tells the input manager if the Beat is actually running the inputs or | ||
// if inputs are only configured for testing/validation purposes. | ||
Init(grp unison.Group, mode Mode) error | ||
|
||
// Creates builds a new Input instance from the given configuation, or returns | ||
// an error if the configuation is invalid. | ||
// The input must establish any connection for data collection yet. The Beat | ||
// will use the Test/Run methods of the input. | ||
Create(*common.Config) (Input, error) | ||
} | ||
|
||
// Mode tells the InputManager in which mode it is initialized. | ||
type Mode uint8 | ||
|
||
//go:generate stringer -type Mode -trimprefix Mode | ||
const ( | ||
ModeRun Mode = iota | ||
ModeTest | ||
ModeOther | ||
) | ||
|
||
// Input is a configured input object that can be used to test or start | ||
// the actual data collection. | ||
type Input interface { | ||
// Name reports the input name. | ||
// | ||
// XXX: check if/how we can remove this method. Currently it is required for | ||
// compatibility reasons with existing interfaces in libbeat, autodiscovery | ||
// and filebeat. | ||
Name() string | ||
|
||
// Test checks the configuaration and runs additional checks if the Input can | ||
// actually collect data for the given configuration (e.g. check if host/port or files are | ||
// accessible). | ||
Test(TestContext) error | ||
|
||
// Run starts the data collection. Run must return an error only if the | ||
// error is fatal making it impossible for the input to recover. | ||
Run(Context, beat.PipelineConnector) error | ||
} | ||
|
||
// Context provides the Input Run function with common environmental | ||
// information and services. | ||
type Context struct { | ||
// Logger provides a structured logger to inputs. The logger is initialized | ||
// with labels that will identify logs for the input. | ||
Logger *logp.Logger | ||
|
||
// The input ID. | ||
ID string | ||
|
||
// Agent provides additional Beat info like instance ID or beat name. | ||
Agent beat.Info | ||
|
||
// Cancelation is used by Beats to signal the input to shutdown. | ||
Cancelation Canceler | ||
} | ||
|
||
// TestContext provides the Input Test function with common environmental | ||
// information and services. | ||
type TestContext struct { | ||
// Logger provides a structured logger to inputs. The logger is initialized | ||
// with labels that will identify logs for the input. | ||
Logger *logp.Logger | ||
|
||
// Agent provides additional Beat info like instance ID or beat name. | ||
Agent beat.Info | ||
|
||
// Cancelation is used by Beats to signal the input to shutdown. | ||
Cancelation Canceler | ||
} | ||
|
||
// Canceler is used to provide shutdown handling to the Context. | ||
type Canceler interface { | ||
Done() <-chan struct{} | ||
Err() error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package v2 | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/feature" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/elastic/go-concert/unison" | ||
) | ||
|
||
// Loader can be used to create Inputs from configurations. | ||
// The loader is initialized with a list of plugins, and finds the correct plugin | ||
// when a configuration is passed to Configure. | ||
type Loader struct { | ||
log *logp.Logger | ||
registry map[string]Plugin | ||
typeField string | ||
defaultType string | ||
} | ||
|
||
// NewLoader creates a new Loader for configuring inputs from a slice if plugins. | ||
// NewLoader returns a SetupError if invalid plugin configurations or duplicates in the slice are detected. | ||
// The Loader will read the plugin name from the configuration object as is | ||
// configured by typeField. If typeField is empty, it defaults to "type". | ||
func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string) (*Loader, error) { | ||
if typeField == "" { | ||
typeField = "type" | ||
} | ||
|
||
if errs := validatePlugins(plugins); len(errs) > 0 { | ||
return nil, &SetupError{errs} | ||
} | ||
|
||
registry := make(map[string]Plugin, len(plugins)) | ||
for _, p := range plugins { | ||
registry[p.Name] = p | ||
} | ||
|
||
return &Loader{ | ||
log: log, | ||
registry: registry, | ||
typeField: typeField, | ||
defaultType: defaultType, | ||
}, nil | ||
} | ||
|
||
// Init runs Init on all InputManagers for all plugins known to the loader. | ||
func (l *Loader) Init(group unison.Group, mode Mode) error { | ||
for _, p := range l.registry { | ||
if err := p.Manager.Init(group, mode); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Configure creates a new input from a Config object. | ||
// The loader reads the input type name from the cfg object and tries to find a | ||
// matching plugin. If a plugin is found, the plugin it's InputManager is used to create | ||
// the input. | ||
// Returns a LoadError if the input name can not be read from the config or if | ||
// the type does not exist. Error values for Ccnfiguration errors do depend on | ||
// the InputManager. | ||
func (l *Loader) Configure(cfg *common.Config) (Input, error) { | ||
name, err := cfg.String(l.typeField, -1) | ||
if err != nil { | ||
if l.defaultType == "" { | ||
return nil, &LoadError{ | ||
Reason: ErrNoInputConfigured, | ||
Message: fmt.Sprintf("%v setting is missing", l.typeField), | ||
} | ||
} | ||
name = l.defaultType | ||
} | ||
|
||
p, exists := l.registry[name] | ||
if !exists { | ||
return nil, &LoadError{Name: name, Reason: ErrUnknownInput} | ||
} | ||
|
||
log := l.log.With("input", name, "stability", p.Stability, "deprecated", p.Deprecated) | ||
switch p.Stability { | ||
case feature.Experimental: | ||
log.Warnf("EXPERIMENTAL: The %v input is experimental", name) | ||
case feature.Beta: | ||
log.Warnf("BETA: The %v input is beta", name) | ||
} | ||
if p.Deprecated { | ||
log.Warnf("DEPRECATED: The %v input is deprecated", name) | ||
} | ||
|
||
return p.Manager.Create(cfg) | ||
} | ||
|
||
// validatePlugins checks if there are multiple plugins with the same name in | ||
// the registry. | ||
func validatePlugins(plugins []Plugin) []error { | ||
var errs []error | ||
|
||
counts := map[string]int{} | ||
for _, p := range plugins { | ||
counts[p.Name]++ | ||
if err := p.validate(); err != nil { | ||
errs = append(errs, err) | ||
} | ||
} | ||
|
||
for name, count := range counts { | ||
if count > 1 { | ||
errs = append(errs, fmt.Errorf("plugin '%v' found %v times", name, count)) | ||
} | ||
} | ||
return errs | ||
} |
Oops, something went wrong.