diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index f6cb1eccdbf..2f5f9dd0b38 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -21,6 +21,7 @@ package compat import ( + "fmt" "sync" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -29,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/go-concert" + "github.com/mitchellh/hashstructure" ) // factory implements the cfgfile.RunnerFactory interface and wraps the @@ -45,6 +47,7 @@ type factory struct { // On stop the runner triggers the shutdown signal and waits until the input // has returned. type runner struct { + id string log *logp.Logger agent *beat.Info wg sync.WaitGroup @@ -81,7 +84,13 @@ func (f *factory) Create( return nil, err } + id, err := configID(config) + if err != nil { + return nil, err + } + return &runner{ + id: id, log: f.log.Named(input.Name()), agent: &f.info, sig: concert.NewOnceSignaler(), @@ -100,7 +109,7 @@ func (r *runner) Start() { log.Infof("Input %v starting", name) err := r.input.Run( v2.Context{ - ID: "", // TODO: hmmm.... + ID: r.id, Agent: *r.agent, Logger: log, Cancelation: r.sig, @@ -120,3 +129,24 @@ func (r *runner) Stop() { r.wg.Wait() r.log.Infof("Input '%v' stopped", r.input.Name()) } + +func configID(config *common.Config) (string, error) { + tmp := struct { + ID string `config:"id"` + }{} + if err := config.Unpack(&tmp); err != nil { + return "", fmt.Errorf("error extracting ID: %w", err) + } + if tmp.ID != "" { + return tmp.ID, nil + } + + var h map[string]interface{} + config.Unpack(&h) + id, err := hashstructure.Hash(h, nil) + if err != nil { + return "", fmt.Errorf("can not compute id from configuration: %w", err) + } + + return fmt.Sprintf("%16X", id), nil +} diff --git a/filebeat/input/v2/compat/composed.go b/filebeat/input/v2/compat/composed.go index 1589dabdd1f..26c274e8891 100644 --- a/filebeat/input/v2/compat/composed.go +++ b/filebeat/input/v2/compat/composed.go @@ -32,15 +32,13 @@ type composeFactory struct { fallback cfgfile.RunnerFactory } -var _ cfgfile.RunnerFactory = composeFactory{} - // Combine takes two RunnerFactory instances and creates a new RunnerFactory. // The new factory will first try to create an input using factory. If this operation fails fallback will be used. // // The new RunnerFactory will return the error of fallback only if factory did // signal that the input type is unknown via v2.ErrUnknown. // -// XXX: This RunnerFactory is used for compining the v2.Loader with the +// XXX: This RunnerFactory is used for combining the v2.Loader with the // existing RunnerFactory for inputs in Filebeat. The Combine function should be removed once the old RunnerFactory is removed. func Combine(factory, fallback cfgfile.RunnerFactory) cfgfile.RunnerFactory { return composeFactory{factory: factory, fallback: fallback}